Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Lumela, Habr! Kajeno re tla haha ​​​​tsamaiso e tla sebetsana le melaetsa ea melaetsa ea Apache Kafka e sebelisa Spark Streaming le ho ngola liphetho tsa ts'ebetso ho database ea leru la AWS RDS.

A re nke hore setsi se itseng sa mokitlane se re behela mosebetsi oa ho sebetsana le litšebelisano tse kenang "ka fofa" makaleng 'ohle a eona. Sena se ka etsoa ka sepheo sa ho bala ka potlako boemo ba chelete e bulehileng bakeng sa letlotlo, meeli kapa liphetho tsa lichelete bakeng sa transaction, joalo-joalo.

Mokhoa oa ho kenya ts'ebetsong nyeoe ena ntle le tšebeliso ea boselamose le boselamose - bala tlas'a sehiloeng! Tsamaea!

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming
(Mohloli oa setšoantšo)

Selelekela

Ehlile, ho sebetsana le data e ngata ka nako ea nnete ho fana ka menyetla e mengata ea ho sebelisoa litsamaisong tsa sejoale-joale. E 'ngoe ea likhokahano tse tsebahalang haholo bakeng sa sena ke tandem ea Apache Kafka le Spark Streaming, moo Kafka e theha letoto la lipakete tsa molaetsa tse kenang, mme Spark Streaming e sebetsana le lipakete tsena ka nako e itseng.

Ho eketsa mamello ea phoso ea kopo, re tla sebelisa li-checkpoints. Ka mochini ona, ha enjene ea Spark Streaming e hloka ho khutlisa data e lahlehileng, e hloka feela ho khutlela sebakeng sa ho qetela sa tlhahlobo ebe o qala lipalo ho tloha moo.

Mehaho ea tsamaiso e tsoetseng pele

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Likarolo tse sebelisitsoeng:

  • Apache Kafka ke mokhoa o ajoang oa phatlalatso oa melaetsa. E loketse tšebeliso ea melaetsa ntle le marang-rang le inthaneteng. Ho thibela tahlehelo ea data, melaetsa ea Kafka e bolokiloe ho disk mme e kopitsoa ka har'a sehlopha. Sistimi ea Kafka e hahiloe kaholimo ho ts'ebeletso ea khokahano ea ZooKeeper;
  • Phallo ea Apache Spark - Karolo ea Spark bakeng sa ho sebetsana le data ea ho phallela. Spark Streaming module e hahiloe ho sebelisoa meralo ea li-micro-batch, moo phallo ea data e hlalosoang e le tatellano e tsoelang pele ea lipakete tse nyane tsa data. Spark Streaming e nka data ho tsoa mehloling e fapaneng ebe e e kopanya hore e be liphutheloana tse nyane. Liphutheloana tse ncha li etsoa ka linako tse ling. Qalong ea nako e 'ngoe le e' ngoe, ho etsoa pakete e ncha, 'me data leha e le efe e fumanoang nakong eo e kenyelelitsoe ka har'a pakete. Qetellong ea nako, kholo ea pakete ea emisa. Boholo ba nako bo khethoa ke parameter e bitsoang "batch interval";
  • SQL ea Apache Spark - e kopanya ts'ebetso ea likamano le mananeo a sebetsang a Spark. Lintlha tse hlophisitsoeng li bolela data e nang le schema, ke hore, lihlopha tse le 'ngoe tsa lirekoto tsohle. Spark SQL e ts'ehetsa ho kenya letsoho ho tsoa mehloling e fapaneng ea data e hlophisitsoeng mme, ka lebaka la boteng ba tlhaiso-leseling ea schema, e khona ho fumana feela likarolo tse hlokahalang tsa lirekoto, hape e fana ka li-API tsa DataFrame;
  • AWS RDS ke polokelo ea litaba e batlang e le theko e tlase e thehiloeng marung, ts'ebeletso ea webo e nolofatsang ho seta, ts'ebetso le ho lekanya, 'me e tsamaisoa ka kotloloho ke Amazon.

Ho kenya le ho tsamaisa seva ea Kafka

Pele o sebelisa Kafka ka kotloloho, o hloka ho etsa bonnete ba hore o na le Java, hobane ... JVM e sebelisetsoa mosebetsi:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ha re theheng mosebelisi e mocha ea tla sebetsa le Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

E latelang, khoasolla kabo ho tsoa webosaeteng ea semmuso ea Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Tlosa polokelo ea polokelo e jarollotsoeng:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Mohato o latelang ke oa boikhethelo. 'Nete ke hore litlhophiso tsa kamehla ha li u lumelle ho sebelisa ka botlalo bokhoni bohle ba Apache Kafka. Mohlala, hlakola sehlooho, sehlopha, sehlopha seo melaetsa e ka phatlalatsoang ho sona. Ho fetola sena, a re lokiseng faele ea tlhophiso:

vim ~/kafka/config/server.properties

Kenya tse latelang qetellong ea faele:

delete.topic.enable = true

Pele o qala seva ea Kafka, o hloka ho qala seva sa ZooKeeper; re tla sebelisa script e thusang e tlang le kabo ea Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Kamora hore ZooKeeper e qale ka katleho, qala seva ea Kafka sebakeng se arohaneng:

bin/kafka-server-start.sh config/server.properties

Ha re theheng sehlooho se secha se bitsoang Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ha re etse bonnete ba hore sehlooho se nang le palo e hlokahalang ea likarohano le phetisetso se entsoe:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Ha re lahleheloeng ke nako ea ho leka mohlahisi le moreki bakeng sa sehlooho se sa tsoa qaptjoa. Lintlha tse ling mabapi le hore na u ka etsa liteko joang ho romella le ho amohela melaetsa li ngotsoe litokomaneng tsa semmuso - Romela melaetsa e meng. Hantle, re tsoela pele ho ngola mohlahisi Python ho sebelisa KafkaProducer API.

Moetsi oa ho ngola

Moetsi o tla hlahisa data e sa reroang - melaetsa ea 100 motsotsoana o mong le o mong. Ka boitsebiso bo sa reroang re bolela bukantswe e nang le dikarolo tse tharo:

  • Branch - lebitso la sebaka sa thekiso ea setsi sa mokitlane;
  • Currency - chelete ea transaction;
  • tjhelete - chelete ea thekiso. Chelete e tla ba palo e nepahetseng haeba e rekoa chelete ke Banka, le nomoro e mpe haeba e rekisoa.

Khoutu ea moetsi e shebahala tjena:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Ka mor'a moo, re sebelisa mokhoa oa ho romela, re romela molaetsa ho seva, ho sehlooho seo re se hlokang, ka mokhoa oa JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Ha re tsamaisa script, re fumana melaetsa e latelang ho terminal:

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Sena se bolela hore ntho e 'ngoe le e' ngoe e sebetsa kamoo re neng re batla kateng - moetsi o hlahisa le ho romela melaetsa sehloohong seo re se hlokang.
Mohato o latelang ke ho kenya Spark le ho sebetsana le molaetsa ona oa molaetsa.

Ho kenya Apache Spark

Apache Spark ke sethala sa bokahohle se sebetsang hantle sa cluster cluster.

Spark e sebetsa hantle ho feta ts'ebetsong e tsebahalang ea mohlala oa MapReduce ha e ntse e ts'ehetsa mefuta e mengata ea likhomphutha, ho kenyeletsoa le lipotso tse sebetsanang le ts'ebetso ea phallo. Speed ​​​​e bapala karolo ea bohlokoa ha o sebetsana le data e ngata, kaha ke lebelo le o lumellang ho sebetsa ka kopanelo ntle le ho qeta metsotso kapa lihora o letile. E 'ngoe ea matla a maholo a Spark a etsang hore e potlake haholo ke bokhoni ba eona ba ho etsa lipalo ka mohopolong.

Moralo ona o ngotsoe ho Scala, kahoo o hloka ho o kenya pele:

sudo apt-get install scala

Khoasolla kabo ea Spark ho webosaete ea semmuso:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Hlakola polokelong ea litaba:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Kenya tsela ea Spark ho faele ea bash:

vim ~/.bashrc

Kenya mela e latelang ka mohlophisi:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Matha taelo e ka tlase kamora ho etsa liphetoho ho bashrc:

source ~/.bashrc

Ho tsamaisa AWS PostgreSQL

Ho setseng feela ke ho kenya database moo re tla kenya tlhahisoleseling e sebetsitsoeng ho tsoa melapong. Bakeng sa sena re tla sebelisa tšebeletso ea AWS RDS.

Eya ho AWS console -> AWS RDS -> Databases -> Theha database:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Khetha PostgreSQL ebe o tobetsa E latelang:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Hobane Mohlala ona ke oa merero ea thuto feela; re tla sebelisa seva ea mahala "bonyane" (Lethathamo la Mahala):
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Ka mor'a moo, re kenya letšoao sebakeng sa Free Tier block, 'me ka mor'a moo re tla fuoa mohlala oa sehlopha sa t2.micro - le hoja se fokola, ha se lefelloe ebile se loketse mosebetsi oa rona:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Ka mor'a moo ho tla lintho tsa bohlokoa haholo: lebitso la mohlala oa database, lebitso la mosebelisi ea hloahloa le password ea hae. Ha re rehele mohlala: myHabrTest, mosebelisi ea hloahloa: habr, phasewete: habr12345 ebe o tobetsa konopo e latelang:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Leqepheng le latelang ho na le liparamente tse ikarabellang bakeng sa phihlello ea seva sa rona sa database ho tsoa kantle (ho fumaneha ho sechaba) le ho fumaneha ha port:

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Ha re theheng boemo bo bocha bakeng sa sehlopha sa ts'ireletso sa VPC, se tla lumella ho fihlella ka ntle ho seva sa rona sa database ka port 5432 (PostgreSQL).
Ha re ee ho khomphutha ea AWS ka fensetere e fapaneng ea sebatli ho Dashboard ea VPC -> Lihlopha tsa Ts'ireletso -> Theha karolo ea sehlopha sa ts'ireletso:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Re beha lebitso la sehlopha sa Ts'ireletso - PostgreSQL, tlhaloso, e bonts'a hore na sehlopha sena se lokela ho amahanngoa le VPC efe ebe o tobetsa konopo ea Create:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Tlatsa Melao e Inbound bakeng sa port 5432 bakeng sa sehlopha se sa tsoa thehoa, joalokaha ho bontšitsoe setšoantšong se ka tlase. U ke ke ua hlakisa boema-kepe ka letsoho, empa khetha PostgreSQL ho tsoa lethathamong le theolelang la Mofuta.

Ha e le hantle, boleng ::/0 bo bolela ho ba teng ha sephethephethe se kenang ho seva ho tsoa lefats'eng lohle, seo e seng 'nete ka ho feletseng, empa ho hlahloba mohlala, a re itumelle ho sebelisa mokhoa ona:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Re khutlela leqepheng la sebatli, moo re nang le "Lokisa litlhophiso tse tsoetseng pele" 'me u khethe karolong ea lihlopha tsa ts'ireletso ea VPC -> Khetha lihlopha tse teng tsa ts'ireletso tsa VPC -> PostgreSQL:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

E latelang, likhethong tsa Database -> Lebitso la database -> beha lebitso - habrDB.

Re ka siea liparamente tse setseng, ntle le ho tima bekapo (nako ea ho boloka bekapo - matsatsi a 0), tlhahlobo le Performance Insights, ka ho sa feleng. Tobetsa konopo Theha database:
Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Motshwari wa khoele

Mohato oa ho qetela e tla ba nts'etsopele ea mosebetsi oa Spark, o tla sebetsana le data e ncha e tsoang Kafka metsotsoana e meng le e meng e 'meli ebe o kenya sephetho ho database.

Joalokaha ho boletsoe ka holimo, libaka tsa tlhahlobo ke mokhoa oa mantlha ho SparkStreaming o tlamehang ho hlophisoa ho netefatsa mamello ea liphoso. Re tla sebelisa li-checkpoints, 'me, haeba ts'ebetso e hloleha, mojule oa Spark Streaming o tla hloka feela ho khutlela sebakeng sa ho qetela sa tlhahlobo ebe o qala lipalo ho tsoa ho eona ho khutlisa data e lahlehileng.

Checkpointing e ka nolofalloa ka ho beha bukana ho sistimi e mamellang liphoso, e tšepahalang (joalo ka HDFS, S3, joalo-joalo) moo tlhaiso-leseling e tla bolokoa. Sena se etsoa ka ho sebelisa, mohlala:

streamingContext.checkpoint(checkpointDirectory)

Mohlaleng oa rona, re tla sebelisa mokhoa o latelang, e leng, haeba checkpointDirectory e le teng, moelelo o tla etsoa hape ho tsoa ho data ea tlhahlobo. Haeba bukana e le sieo (ke hore e entsoe ka lekhetlo la pele), joale functionToCreateContext e bitsoa ho theha moelelo o mocha le ho lokisa DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Re theha ntho ea DirectStream ho hokela sehlooho sa "transaction" re sebelisa mokhoa oa bopaDirectStream laebraring ea KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Ho hlophisa lintlha tse kenang ka sebopeho sa JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Re sebelisa Spark SQL, re etsa sehlopha se bonolo mme re bonts'a sephetho ho khomphutha:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ho fumana mongolo oa potso le ho o tsamaisa ka Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ebe re boloka datha tse kopaneng tafoleng ho AWS RDS. Ho boloka liphetho tsa pokello ho tafole ea database, re tla sebelisa mokhoa oa ho ngola oa ntho ea DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Mantsoe a seng makae mabapi le ho theha khokahano ho AWS RDS. Re e thehile mosebelisi le password bakeng sa eona mohatong oa "Deploying AWS PostgreSQL". U lokela ho sebelisa Endpoint joalo ka url ea seva sa database, e bonts'itsoeng karolong ea Khokahano le ts'ireletso:

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Bakeng sa ho hokela Spark le Kafka ka nepo, o lokela ho tsamaisa mosebetsi ka ho fana ka smark o sebelisa artifact. spark-streaming-kafka-0-8_2.11. Ho feta moo, re tla boela re sebelise artifact bakeng sa ho sebelisana le database ea PostgreSQL; re tla li fetisetsa ka --packages.

Bakeng sa ho feto-fetoha ha script, re tla kenyelletsa hape e le litekanyo tsa ho kenya lebitso la seva sa molaetsa le sehlooho seo re batlang ho fumana data ho sona.

Kahoo, ke nako ea ho qala le ho hlahloba ts'ebetso ea sistimi:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Tsohle di ile tsa sebetsa! Joalo ka ha u bona setšoantšong se ka tlase, ha ts'ebeliso e ntse e sebetsa, liphetho tse ncha tsa ho kopanya li hlahisoa metsotsoana e meng le e meng e 2, hobane re behile nako ea batching ho metsotsoana e 2 ha re theha ntho ea StreamingContext:

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

Ka mor'a moo, re etsa potso e bonolo ho database ho hlahloba boteng ba litlaleho tafoleng transaction_flow:

Apache Kafka le Phallo ea Ts'ebetso ea Boitsebiso ka Spark Streaming

fihlela qeto e

Sengoliloeng sena se shebile mohlala oa ts'ebetso ea phallo ea tlhaiso-leseling e sebelisang Spark Streaming hammoho le Apache Kafka le PostgreSQL. Ka kholo ea data ho tsoa mehloling e fapaneng, ho thata ho fetelletsa boleng bo sebetsang ba Spark Streaming bakeng sa ho theha lits'ebetso tsa ho phallela le tsa nako ea nnete.

U ka fumana khoutu e felletseng ea mohloli sebakeng sa ka sa polokelo ho GitHub.

Ke thabetse ho tšohla sehlooho sena, ke labalabela ho fana ka litlhaloso tsa hau, hape ke tšepa ho nyatsuoa ho hahang ho tsoa ho babali bohle ba tsotellang.

Ke u lakaletsa katleho!

Lipes. Qalong ho ne ho reriloe ho sebelisa database ea sebaka sa PostgreSQL, empa ka lebaka la lerato la ka bakeng sa AWS, ke ile ka etsa qeto ea ho fetisetsa database ho leru. Sehloohong se latelang tabeng ena, ke tla bontša mokhoa oa ho kenya ts'ebetsong tsamaiso eohle e hlalositsoeng ka holimo ho AWS ho sebelisa AWS Kinesis le AWS EMR. Latela litaba!

Source: www.habr.com

Eketsa ka tlhaloso