Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Mhoro, Habr! Nhasi tichavaka sisitimu inozogadzira maApache Kafka meseji hova tichishandisa Spark Kutenderera uye kunyora mhinduro dzekugadzirisa kuAWS RDS cloud database.

Ngatimbofungidzira kuti imwe kiredhiti kiredhiti inotipa basa rekugadzirisa mabhindauko anouya "pane nhunzi" pamapazi ayo ese. Izvi zvinogona kuitwa nechinangwa chekukasira kuverenga nzvimbo yakavhurika yemari yehomwe, miganho kana mhedzisiro yemari yekutengeserana, nezvimwe.

Nzira yekushandisa sei nyaya iyi pasina kushandisa zvemashiripiti nemashiripiti - verenga pasi pekucheka! Enda!

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming
(Mabviro emufananidzo)

Nhanganyaya

Ehe, kugadzirisa huwandu hukuru hwe data munguva chaiyo kunopa mikana yakawanda yekushandiswa mune zvemazuva ano masisitimu. Imwe yeanonyanya kufarirwa musanganiswa weiyi tandem yeApache Kafka uye Spark Kutenderera, uko Kafka inogadzira inoyerera meseji mapaketi, uye Spark Streaming inoitisa aya mapaketi panguva yakatarwa.

Kuwedzera kukanganisa kushivirira kwechishandiso, isu tichashandisa macheki. Neiyi michina, kana iyo Spark Yekutenderera injini inoda kudzoreredza data yakarasika, inongoda kudzokera kune yekupedzisira cheki uye kutangazve kuverenga kubva ipapo.

Architecture yeyakagadziridzwa system

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Zvishandiso zvakashandiswa:

  • Apache Kafka ndeye yakagoverwa publish-subscribe messaging system. Inokodzera zvese zvisiri pamhepo uye online meseji kushandiswa. Kuti udzivise kurasikirwa kwedata, mameseji eKafka anochengetwa padhisiki uye anodzokororwa mukati meboka. Iyo Kafka system yakavakirwa pamusoro peZooKeeper synchronization sevhisi;
  • Apache Spark Streaming - Spark chikamu chekugadzirisa kutenderera data. Iyo Spark Streaming module inovakwa uchishandisa micro-batch architecture, uko kuyerera kwedata kunodudzirwa sekutevedzana kunoenderera kwediki data packet. Spark Streaming inotora data kubva kwakasiyana masosi uye inoisanganisa mumapakeji madiki. Mapakeji matsva anogadzirwa nguva nenguva. Pakutanga kwenguva yega yega, pakiti nyowani inogadzirwa, uye chero data yakagamuchirwa panguva iyoyo inosanganisirwa mupaketi. Pakupera kwepakati, kukura kwepakiti kunomira. Saizi yepakati inotarwa neparameter inonzi batch interval;
  • Apache Spark SQL - inosanganisa kugadzirisa kwehukama neSpark functional programming. Yakarongeka data inoreva data ine schema, kureva, imwe seti yeminda yeese marekodhi. Spark SQL inotsigira mapindiro kubva kwakasiyana-siyana yakarongeka data masosi uye, nekuda kwekuwanikwa kweruzivo rwe schema, inogona kunyatso tora chete minda inodiwa yemarekodhi, uye inopawo DataFrame APIs;
  • AWS RDS inzvimbo isingadhure yegore-based relational database, sevhisi yewebhu inorerutsa kuseta, kushanda uye kuyera, uye inotungamirwa zvakananga neAmazon.

Kuisa uye kumhanya iyo Kafka server

Usati washandisa Kafka zvakananga, unofanirwa kuve nechokwadi chekuti une Java, nekuti ... JVM inoshandiswa kubasa:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ngatigadzire mushandisi mutsva wekushanda naKafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Tevere, dhawunirodha kugovera kubva kune yepamutemo Apache Kafka webhusaiti:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Bvisa dura rakadhaunirodwa:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Nhanho inotevera ndeyekusarudza. Icho chokwadi ndechekuti zvigadziriso zvimiro hazvikubvumidze kushandisa zvizere masimba eApache Kafka. Semuenzaniso, bvisa musoro, chikamu, boka iro mameseji anogona kuburitswa. Kuti uchinje izvi, ngatigadzirise iyo faira yekumisikidza:

vim ~/kafka/config/server.properties

Wedzera zvinotevera kumagumo efaira:

delete.topic.enable = true

Usati watanga sevha yeKafka, unofanirwa kutanga sevha yeZooKeeper; isu tichashandisa script yekubatsira inouya neKafka kugovera:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Mushure mekunge ZooKeeper yatanga zvinobudirira, tanga iyo Kafka server mune yakaparadzana terminal:

bin/kafka-server-start.sh config/server.properties

Ngatigadzire nyaya itsva inonzi Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ngativei nechokwadi chekuti musoro une nhamba inodiwa yezvikamu uye kudzokorora yakagadzirwa:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Ngatipotsai nguva dzekuyedza mugadziri uye mutengi wenyaya ichangogadzirwa. Mamwe ruzivo nezve maitiro aungaita kuyedza kutumira uye kugamuchira mameseji akanyorwa mune zviri pamutemo zvinyorwa - Tumira mamwe mameseji. Zvakanaka, isu tinoenderera mberi nekunyora mugadziri muPython tichishandisa KafkaProducer API.

Mugadziri achinyora

Mugadziri achagadzira data isina kurongeka - 100 mameseji sekondi yega yega. Nekungoitika data tinoreva duramazwi rine zvikamu zvitatu:

  • bazi - zita renzvimbo yekutengesa yekiredhiti;
  • mari - mari yekutengeserana;
  • uwandu - mari yekutengeserana. Mari yacho ichava nhamba yakanaka kana iri kutengwa kwemari neBhangi, uye nhamba isina kunaka kana iri kutengeswa.

Kodhi yemugadziri inoita seizvi:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Tevere, tichishandisa nzira yekutumira, tinotumira meseji kune sevha, kune iyo musoro watinoda, mune JSON fomati:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Paunenge uchimhanyisa script, tinogashira anotevera mameseji mune terminal:

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Izvi zvinoreva kuti zvese zvinoshanda sezvataida - mugadziri anogadzira uye anotumira mameseji kumusoro watinoda.
Nhanho inotevera ndeyekuisa Spark uye kugadzirisa iyi meseji rwizi.

Kuisa Apache Spark

Apache spark inzvimbo yepasirese uye yakakwirira-inoita cluster computing chikuva.

Spark inoita zvirinani pane kuita kwakakurumbira kushandiswa kweMepuReduce modhi uku ichitsigira huwandu hwakawanda hwemhando dzemakomputa, kusanganisira mibvunzo inopindirana uye kugadzirisa kwerukova. Kumhanya kunoita basa rakakosha paunenge uchigadzirisa huwandu hukuru hwe data, sezvo iri kumhanya iyo inokutendera kuti ushande zvakabatana pasina kupedza maminetsi kana maawa wakamirira. Rimwe remasimba makuru eSpark anoita kuti ikurumidze kwazvo kugona kuita mu-mundangariro kuverenga.

Iyi sisitimu yakanyorwa muScala, saka unofanirwa kuiisa kutanga:

sudo apt-get install scala

Dhawunirodha iyo Spark kugovera kubva kune yepamutemo webhusaiti:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Bvisa mudura renhoroondo:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Wedzera nzira yekuSpark kune bash faira:

vim ~/.bashrc

Wedzera mitsara inotevera kuburikidza nemupepeti:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Mhanya iyo iri pasi apa mushure mekuita shanduko kune bashrc:

source ~/.bashrc

Kutumira AWS PostgreSQL

Chasara kuendesa dhatabhesi umo isu tichaisa iyo yakagadziriswa ruzivo kubva kuhova. Kune izvi isu tichashandisa iyo AWS RDS sevhisi.

Enda kuAWS console -> AWS RDS -> Databases -> Gadzira dhatabhesi:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Sarudza PostgreSQL uye tinya Next:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Nokuti Uyu muenzaniso ndewezvinangwa zvekudzidzisa chete; isu tichashandisa sevha yemahara "pashoma" (Yemahara Tier):
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Tevere, tinoisa tiki muFree Tier block, uye mushure mezvo tichazopihwa otomatiki muenzaniso we t2.micro kirasi - kunyangwe isina kusimba, ndeyemahara uye yakanyatsokodzera basa redu:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Zvadaro zvinouya zvakakosha zvinhu: zita remuenzaniso wedhatabhesi, zita remushandisi wemushandisi uye password yake. Ngatitaure muenzaniso: myHabrTest, tenzi mushandisi: habr, pasiwedhi: habr12345 uye tinya bhatani rinotevera:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Pane peji rinotevera pane maparamendi ane chekuita nekuwanikwa kwesevha yedu yedatabase kubva kunze (Kuwanikwa kwePublic) uye kuwanikwa kwechiteshi:

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Ngatigadzirei kurongeka kutsva kweboka reVPC rekuchengetedza, iro rinobvumira kupinda kwekunze kune yedu database server kuburikidza nechiteshi 5432 (PostgreSQL).
Handei kuAWS koni mune yakaparadzana browser hwindo kuVPC Dashboard -> Chengetedzo Mapoka -> Gadzira boka rekuchengetedza chikamu chikamu:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Isu tinoseta zita reboka reChengetedzo - PostgreSQL, tsananguro, inoratidza kuti ndeipi VPC boka iri rinofanira kubatanidzwa nayo uye tinya bhatani Gadzira:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Zadza iyo Inbound mitemo yechiteshi 5432 yeboka richangobva kugadzirwa, sezvaratidzwa pamufananidzo uri pazasi. Iwe haugone kutsanangura chiteshi nemaoko, asi sarudza PostgreSQL kubva kune Type yekudonha-pasi runyorwa.

Kunyatsotaura, kukosha ::/0 kunoreva kuwanikwa kweiyo inouya traffic kune server kubva pasirese, izvo zvisiri zvechokwadi zvachose, asi kuongorora muenzaniso, ngatizvibvumire isu kushandisa nzira iyi:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Isu tinodzokera kune peji rebrowser, kwatine "Gadzirisa zvigadziriso zvepamberi" vhura uye sarudza muchikamu cheVPC chekuchengetedza mapoka -> Sarudza iripo VPC mapoka ekuchengetedza -> PostgreSQL:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Tevere, mune iyo Database sarudzo -> Database zita -> isa zita - habrDB.

Tinogona kusiya maparamita asara, kunze kwekudzima backup (nguva yekuchengeta gadziriro - mazuva 0), kutarisa uye Performance Insights, nekukasira. Dzvanya pabhatani Gadzira database:
Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Thread handler

Nhanho yekupedzisira ichave kuvandudzwa kwebasa reSpark, iro rinogadzira data nyowani rinouya kubva kuKafka masekonzi maviri ega ega uye kuisa mhedzisiro mudhatabhesi.

Sezvataurwa pamusoro apa, macheki ndiyo musimboti nzira muSparkStreaming iyo inofanirwa kugadzirwa kuti ive nechokwadi chekushivirira kukanganisa. Isu tichashandisa macheki uye, kana maitiro akatadza, iyo Spark Streaming module inongoda kudzokera kune yekupedzisira cheki uye kutangisa kuverenga kubva mairi kudzoreredza data rakarasika.

Kutarisa kunogona kugoneswa nekuisa dhairekitori pane inotadza-inoshivirira, yakavimbika faira system (yakadai seHDFS, S3, etc.) umo iyo yekuongorora data ichachengetwa. Izvi zvinoitwa nekushandisa, semuenzaniso:

streamingContext.checkpoint(checkpointDirectory)

Mumuenzaniso wedu, isu tichashandisa nzira inotevera, iyo, kana checkpointDirectory iripo, ipapo mamiriro acho achagadzirwazve kubva kune cheki cheki data. Kana iyo dhairekitori isipo (kureva kuurayiwa kekutanga), ipapo functionToCreateContext inodanwa kugadzira mamiriro matsva uye kugadzirisa DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Isu tinogadzira chinhu cheDirectStream chekubatanidza kune "transaction" musoro tichishandisa gadziraDirectStream nzira yeKafkaUtils raibhurari:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Kuisa data rinouya muJSON fomati:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Tichishandisa Spark SQL, tinoita boka rakareruka uye tinoratidza mhedzisiro mukoni:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Kuwana iwo mameseji emubvunzo nekumhanyisa kuburikidza neSpark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Uye tobva tachengetedza iyo yakasanganiswa data mutafura muAWS RDS. Kuti uchengetedze zvakaunganidzwa patafura yedatabase, isu tichashandisa nzira yekunyora yeDataFrame chinhu:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Mazwi mashoma nezve kumisikidza chinongedzo kuAWS RDS. Isu takagadzira mushandisi uye password payo "Kuendesa AWS PostgreSQL" nhanho. Iwe unofanirwa kushandisa Endpoint se database server url, iyo inoratidzwa muKubatana & kuchengetedza chikamu:

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Kuti ubatanidze nemazvo Spark neKafka, unofanirwa kumhanyisa basa kuburikidza ne smark-kutumira uchishandisa iyo artifact. spark-streaming-kafka-0-8_2.11. Pamusoro pezvo, isu tichashandisawo artifact yekudyidzana nePostgreSQL dhatabhesi; isu tichavaendesa kuburikidza --packages.

Nekuchinjika kweiyo script, isu tinozobatanidzawo semapindiro ekuisa zita remeseji server uye musoro watinoda kugamuchira data.

Saka, inguva yekutanga uye kutarisa mashandiro ehurongwa:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Zvose zvakabudirira! Sezvauri kuona pamufananidzo uri pazasi, apo application iri kushanda, mitsva yekuunganidza mitsva inobuda mumasekonzi maviri ega ega, nekuti isu takaisa iyo batching nguva kusvika masekonzi maviri patakagadzira iyo StreamingContext chinhu:

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

Tevere, tinoita mubvunzo wakapfava kune dhatabhesi kuti titarise kuvepo kwemarekodhi mutafura transaction_flow:

Apache Kafka uye Yekutepfenyura Data Processing neSpark Streaming

mhedziso

Ichi chinyorwa chakatarisa muenzaniso wekuyerera kweruzivo uchishandisa Spark Streaming yakabatana neApache Kafka uye PostgreSQL. Nekukura kwedhata kubva kwakasiyana masosi, zvakaoma kuwedzeredza kukosha kunoshanda kweSpark Kutenderera kwekugadzira kutenderera uye chaiyo-nguva maapplication.

Unogona kuwana iyo yakazara sosi kodhi mune yangu repository pa GitHub.

Ndiri kufara kukurukura chinyorwa ichi, ndinotarisira kumashoko enyu, uye ndinotarisirawo kutsoropodzwa kunovaka kubva kuvaverengi vose vane hanya.

Ndinoshuva iwe kubudirira!

Mapisarema. Pakutanga zvakarongwa kushandisa dhatabhesi rePostgreSQL, asi ndichipihwa rudo rwangu rweAWS, ndakafunga kuendesa dhatabhesi kune gore. Muchinyorwa chinotevera pane iyi musoro, ini ndicharatidza maitiro ekuita iyo yese system yakatsanangurwa pamusoro muAWS uchishandisa AWS Kinesis uye AWS EMR. Tevera nhau!

Source: www.habr.com

Voeg