Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Nyob zoo, Habr! Niaj hnub no peb yuav tsim ib qho system uas yuav ua tiav Apache Kafka xov kwj siv Spark Streaming thiab sau cov txiaj ntsig ua tiav rau AWS RDS huab database.

Cia peb xav txog tias qee lub tuam txhab qiv nyiaj tau teeb tsa peb txoj haujlwm ntawm kev ua cov khoom xa tuaj "ntawm ya" thoob plaws tag nrho nws cov ceg. Qhov no tuaj yeem ua tiav rau lub hom phiaj ntawm kev ntsuas tam sim ntawm qhov qhib txiaj rau lub txhab nyiaj, txwv lossis cov txiaj ntsig nyiaj txiag rau kev lag luam, thiab lwm yam.

Yuav ua li cas siv cov ntaub ntawv no yam tsis siv cov khawv koob thiab cov khawv koob spell - nyeem hauv qab txiav! Mus!

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming
(Image Source)

Taw qhia

Tau kawg, kev ua tiav ntau cov ntaub ntawv hauv lub sijhawm tiag tiag muab sijhawm txaus rau kev siv hauv cov tshuab niaj hnub. Ib qho kev sib xyaw ua ke nrov tshaj plaws rau qhov no yog qhov sib tw ntawm Apache Kafka thiab Spark Streaming, qhov twg Kafka tsim cov kwj ntawm cov ntawv xa tuaj, thiab Spark Streaming txheej txheem cov pob ntawv no ntawm lub sijhawm ib ntus.

Txhawm rau nce qhov ua txhaum ntawm daim ntawv thov, peb yuav siv cov chaw kuaj xyuas. Nrog rau cov txheej txheem no, thaum Spark Streaming lub cav xav tau rov qab cov ntaub ntawv ploj, nws tsuas yog yuav tsum rov qab mus rau qhov chaw kuaj xyuas kawg thiab rov pib xam los ntawm qhov ntawd.

Architecture ntawm tus tsim system

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Cheebtsam siv:

  • Apache Kafka yog ib tug faib publish-subscribe messaging system. Haum rau ob qho tib si offline thiab online siv cov lus. Txhawm rau tiv thaiv cov ntaub ntawv poob, Kafka cov lus khaws cia rau hauv disk thiab rov ua dua hauv pawg. Lub Kafka system yog tsim nyob rau sab saum toj ntawm ZooKeeper synchronization kev pab cuam;
  • Apache Spark Streaming - Spark tivthaiv rau kev ua cov ntaub ntawv streaming. Lub Spark Streaming module yog tsim los siv micro-batch architecture, qhov twg cov ntaub ntawv kwj yog txhais raws li ib ntus txuas ntxiv ntawm cov ntaub ntawv me me. Spark Streaming siv cov ntaub ntawv los ntawm ntau qhov chaw thiab muab tso rau hauv cov pob me me. Cov pob tshiab yog tsim nyob rau lub sijhawm tsis tu ncua. Thaum pib ntawm txhua lub sijhawm, ib pob ntawv tshiab raug tsim, thiab cov ntaub ntawv tau txais thaum lub sijhawm ntawd suav nrog hauv pob ntawv. Thaum kawg ntawm lub sijhawm, pob ntawv loj hlob nres. Qhov luaj li cas ntawm lub caij nyoog yog txiav txim los ntawm ib qho parameter hu ua batch interval;
  • Apache Spark SQL - ua ke kev sib raug zoo nrog kev ua haujlwm nrog Spark. Cov ntaub ntawv tsim qauv txhais tau tias cov ntaub ntawv uas muaj schema, uas yog, ib pawg ntawm cov teb rau txhua cov ntaub ntawv. Spark SQL txhawb kev tawm tswv yim los ntawm ntau yam kev teeb tsa cov ntaub ntawv thiab, ua tsaug rau qhov muaj cov ntaub ntawv schema, nws tuaj yeem khaws tau zoo tsuas yog cov ntaub ntawv xav tau, thiab tseem muab DataFrame APIs;
  • AWS RDS yog ib qho pheej yig huab-raws li kev sib raug zoo database, lub vev xaib kev pabcuam uas yooj yim teeb tsa, ua haujlwm thiab ntsuas, thiab yog tswj hwm ncaj qha los ntawm Amazon.

Txhim kho thiab khiav Kafka server

Ua ntej siv Kafka ncaj qha, koj yuav tsum paub tseeb tias koj muaj Java, vim ... JVM yog siv rau kev ua haujlwm:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Cia peb tsim tus neeg siv tshiab los ua haujlwm nrog Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Tom ntej no, rub tawm qhov kev faib tawm ntawm Apache Kafka lub vev xaib official:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Unpack lub downloaded archive:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Cov kauj ruam tom ntej yog xaiv tau. Qhov tseeb yog tias qhov chaw pib tsis tso cai rau koj siv tag nrho cov peev txheej ntawm Apache Kafka. Piv txwv li, rho tawm ib lub ncauj lus, qeb, pab pawg uas cov lus tuaj yeem luam tawm. Txhawm rau hloov qhov no, cia peb hloov kho cov ntaub ntawv teeb tsa:

vim ~/kafka/config/server.properties

Ntxiv cov hauv qab no rau qhov kawg ntawm cov ntaub ntawv:

delete.topic.enable = true

Ua ntej pib Kafka server, koj yuav tsum pib lub ZooKeeper server; peb yuav siv cov ntawv pabcuam uas tuaj nrog Kafka faib:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Tom qab ZooKeeper tau pib ua tiav, tso tawm Kafka server hauv ib lub davhlau ya nyob twg:

bin/kafka-server-start.sh config/server.properties

Cia peb tsim ib lub ntsiab lus tshiab hu ua Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Cia peb ua kom paub tseeb tias lub ncauj lus nrog tus lej ntawm cov kev faib thiab rov ua dua tau tsim:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Cia peb nco lub sijhawm ntawm kev sim cov neeg tsim khoom thiab cov neeg siv khoom rau lub ntsiab lus tsim tshiab. Cov ntsiab lus ntxiv txog yuav ua li cas koj tuaj yeem kuaj xa thiab txais cov lus raug sau rau hauv cov ntaub ntawv raug cai - Xa ib co lus. Zoo, peb tsiv mus rau sau tus tsim tawm hauv Python siv KafkaProducer API.

Tus tsim tawm sau

Tus tsim tawm yuav tsim cov ntaub ntawv random - 100 cov lus txhua ob. Los ntawm cov ntaub ntawv random peb txhais tau tias phau ntawv txhais lus muaj peb qhov chaw:

  • Ncau ceg - lub npe ntawm lub tsev txhab nyiaj qhov chaw muag khoom;
  • txiaj - kev lag luam txiaj;
  • nyiaj - kev pauv nyiaj. Tus nqi yuav yog tus lej zoo yog tias nws yog kev yuav khoom ntawm lub txhab nyiaj, thiab tus lej tsis zoo yog tias nws yog qhov muag.

Cov cai rau tus tsim tawm zoo li no:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Tom ntej no, siv txoj kev xa, peb xa lus mus rau lub server, rau lub ntsiab lus peb xav tau, hauv JSON hom:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Thaum khiav tsab ntawv, peb tau txais cov lus hauv qab no hauv lub davhlau ya nyob twg:

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Qhov no txhais tau tias txhua yam ua haujlwm raws li peb xav tau - tus tsim khoom tsim thiab xa cov lus rau lub ntsiab lus peb xav tau.
Cov kauj ruam tom ntej yog rau nruab Spark thiab ua cov kab lus no.

Txhim kho Apache Spark

Apache txim yog universal thiab high-performance pawg xam platform.

Spark ua tau zoo dua li kev siv nrov ntawm MapReduce tus qauv thaum txhawb nqa ntau hom kev suav, suav nrog cov lus nug sib tham thiab kev ua haujlwm kwj. Kev nrawm ua lub luag haujlwm tseem ceeb thaum ua cov ntaub ntawv ntau, vim nws yog qhov ceev uas tso cai rau koj los ua haujlwm sib cuam tshuam yam tsis siv feeb lossis teev tos. Ib qho ntawm Spark lub zog loj tshaj plaws uas ua rau nws nrawm heev yog nws lub peev xwm los ua kev suav hauv nco.

Lub moj khaum no tau sau rau hauv Scala, yog li koj yuav tsum tau nruab nws ua ntej:

sudo apt-get install scala

Rub tawm Spark faib los ntawm lub vev xaib official:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Unpack lub archive:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ntxiv txoj hauv kev rau Spark rau cov ntaub ntawv bash:

vim ~/.bashrc

Ntxiv cov kab hauv qab no los ntawm tus editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Khiav cov lus txib hauv qab no tom qab hloov pauv rau bashrc:

source ~/.bashrc

Deploying AWS PostgreSQL

Txhua yam uas tseem tshuav yog xa cov ntaub ntawv mus rau hauv uas peb yuav upload cov ntaub ntawv ua tiav los ntawm cov kwj deg. Rau qhov no peb yuav siv cov kev pabcuam AWS RDS.

Mus rau AWS console -> AWS RDS -> Databases -> Create database:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Xaiv PostgreSQL thiab nyem Next:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Vim Qhov piv txwv no yog rau kev kawm nkaus xwb; peb yuav siv lub server dawb "tsawg kawg" (Free Tier):
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Tom ntej no, peb muab zuam rau hauv Free Tier block, thiab tom qab ntawd peb yuav tau txais ib qho piv txwv ntawm t2.micro chav kawm - txawm tias tsis muaj zog, nws yog dawb thiab haum rau peb txoj haujlwm:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Tom ntej no tuaj ntau yam tseem ceeb: lub npe ntawm cov ntaub ntawv piv txwv, lub npe ntawm tus tswv siv thiab nws tus password. Cia peb lub npe piv txwv: myHabrTest, tus neeg siv tswv: habr, tus password: ib 12345 thiab nyem rau ntawm lub pob Next:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Nyob rau nplooj ntawv tom ntej no muaj cov tsis muaj lub luag haujlwm rau kev nkag tau ntawm peb cov ntaub ntawv server los ntawm sab nraud (Public accessibility) thiab chaw nres nkoj muaj:

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Cia peb tsim qhov chaw tshiab rau pawg VPC kev ruaj ntseg, uas yuav tso cai rau sab nraud nkag mus rau peb cov ntaub ntawv database ntawm chaw nres nkoj 5432 (PostgreSQL).
Wb mus rau AWS console nyob rau hauv ib lub qhov rai browser sib cais mus rau VPC Dashboard -> Security Groups -> Tsim pab pawg kev ruaj ntseg seem:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Peb teeb tsa lub npe rau Pawg Kev Ruaj Ntseg - PostgreSQL, cov lus piav qhia, qhia tias VPC pab pawg no yuav tsum koom nrog thiab nyem lub pob Tsim:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Sau cov cai Inbound rau qhov chaw nres nkoj 5432 rau cov pab pawg tsim tshiab, raws li pom hauv daim duab hauv qab no. Koj tsis tuaj yeem hais qhia qhov chaw nres nkoj manually, tab sis xaiv PostgreSQL los ntawm Hom drop-down daim ntawv teev npe.

Hais lus nruj me ntsis, tus nqi ::/0 txhais tau hais tias muaj kev nkag mus rau cov neeg rau zaub mov los ntawm thoob plaws lub ntiaj teb, uas yog canonically tsis muaj tseeb, tab sis txhawm rau txheeb xyuas qhov piv txwv, cia peb tso cai rau peb tus kheej siv txoj hauv kev no:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Peb rov qab mus rau nplooj ntawv browser, qhov uas peb muaj "Txhim kho qhov chaw siab tshaj" qhib thiab xaiv hauv VPC pawg kev ruaj ntseg ntu -> Xaiv cov pab pawg kev ruaj ntseg VPC -> PostgreSQL:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Tom ntej no, hauv cov kev xaiv Database -> Database name -> teem lub npe - habrDB.

Peb tuaj yeem tso cov kev txwv tsis pub dhau, tshwj tsis yog kev cuam tshuam kev thaub qab (lub sijhawm khaws cia - 0 hnub), kev saib xyuas thiab Kev Tshawb Fawb Kev Ua Haujlwm, los ntawm lub neej ntawd. Nyem rau ntawm lub pob Tsim database:
Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Xov handler

Cov theem kawg yuav yog kev txhim kho ntawm Spark txoj haujlwm, uas yuav ua cov ntaub ntawv tshiab los ntawm Kafka txhua ob vib nas this thiab nkag mus rau qhov tshwm sim rau hauv cov ntaub ntawv.

Raws li tau sau tseg saum toj no, cov chaw kuaj xyuas yog cov txheej txheem tseem ceeb hauv SparkStreaming uas yuav tsum tau teeb tsa kom ntseeg tau tias muaj kev zam txim txhaum cai. Peb yuav siv cov chaw kuaj xyuas thiab, yog tias cov txheej txheem tsis ua tiav, Spark Streaming module tsuas yog yuav tsum rov qab mus rau qhov chaw kuaj xyuas kawg thiab rov pib xam los ntawm nws kom rov qab tau cov ntaub ntawv ploj.

Kev txheeb xyuas tuaj yeem qhib tau los ntawm kev teeb tsa cov ntawv teev npe ntawm qhov ua txhaum cai, txhim khu kev qha cov ntaub ntawv (xws li HDFS, S3, thiab lwm yam) uas cov ntaub ntawv checkpoint yuav raug khaws cia. Qhov no yog ua tiav siv, piv txwv li:

streamingContext.checkpoint(checkpointDirectory)

Hauv peb qhov piv txwv, peb yuav siv txoj hauv kev hauv qab no, uas yog, yog tias checkpointDirectory muaj, ces cov ntsiab lus yuav raug rov tsim dua los ntawm cov ntaub ntawv checkpoint. Yog tias cov npe tsis muaj nyob (piv txwv li raug tua thawj zaug), ces functionToCreateContext raug hu los tsim cov ntsiab lus tshiab thiab teeb tsa DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Peb tsim cov khoom DirectStream los txuas rau "kev lag luam" cov ncauj lus uas siv cov txheej txheem createDirectStream ntawm lub tsev qiv ntawv KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing cov ntaub ntawv nkag hauv JSON hom:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Siv Spark SQL, peb ua ib pawg yooj yim thiab tso tawm qhov tshwm sim rau lub console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Tau txais cov lus nug thiab khiav nws los ntawm Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Thiab tom qab ntawd peb khaws cov ntaub ntawv sib sau ua ke rau hauv lub rooj hauv AWS RDS. Txhawm rau txuag cov txiaj ntsig sib sau rau hauv lub rooj database, peb yuav siv txoj kev sau ntawv ntawm DataFrame object:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Ob peb lo lus hais txog kev teeb tsa kev sib txuas rau AWS RDS. Peb tsim tus neeg siv thiab lo lus zais rau nws ntawm "Deploying AWS PostgreSQL" kauj ruam. Koj yuav tsum siv Endpoint ua tus database server url, uas tau tshwm sim nyob rau hauv Connectivity & security section:

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Yuav kom txuas Spark thiab Kafka kom raug, koj yuav tsum khiav txoj haujlwm ntawm smark-submit siv cov khoom cuav spark-streaming-kafka-0-8_2.11. Tsis tas li ntawd, peb tseem yuav siv cov khoom cuav rau kev cuam tshuam nrog PostgreSQL database; peb yuav hloov lawv ntawm --packages.

Rau qhov yooj yim ntawm tsab ntawv, peb kuj tseem yuav suav nrog cov kev nkag tsis tau lub npe ntawm cov lus server thiab cov ncauj lus uas peb xav tau txais cov ntaub ntawv.

Yog li, nws yog lub sijhawm los tso tawm thiab tshawb xyuas qhov system ua haujlwm:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Txhua yam ua tiav! Raws li koj tuaj yeem pom hauv daim duab hauv qab no, thaum daim ntawv thov tab tom ua haujlwm, cov txiaj ntsig kev sib sau tshiab tau tso tawm txhua 2 vib nas this, vim tias peb teeb tsa lub sijhawm ua haujlwm rau 2 vib nas this thaum peb tsim cov khoom StreamingContext:

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

Tom ntej no, peb ua cov lus nug yooj yim rau cov ntaub ntawv los xyuas seb muaj cov ntaub ntawv nyob hauv lub rooj kev pauv_flow:

Apache Kafka thiab Streaming Data Processing nrog Spark Streaming

xaus

Kab lus no tau saib ib qho piv txwv ntawm kev ua cov ntaub ntawv siv Spark Streaming nrog Apache Kafka thiab PostgreSQL. Nrog rau kev loj hlob ntawm cov ntaub ntawv los ntawm ntau qhov chaw, nws yog ib qho nyuaj rau overestimate tus nqi tswv yim ntawm Spark Streaming rau tsim streaming thiab real-time daim ntaub ntawv.

Koj tuaj yeem pom tag nrho qhov chaws hauv kuv qhov chaw khaws cia ntawm GitHub.

Kuv zoo siab los tham txog tsab xov xwm no, Kuv tos ntsoov rau koj cov lus pom, thiab kuv kuj vam tias yuav muaj kev thuam los ntawm txhua tus nyeem ntawv saib xyuas.

Kuv thov kom koj ua tiav!

Ntawv. Thaum xub thawj nws tau npaj siv PostgreSQL database hauv zos, tab sis muab kuv txoj kev hlub rau AWS, kuv txiav txim siab txav cov ntaub ntawv mus rau huab. Hauv tsab xov xwm tom ntej ntawm lub ncauj lus no, kuv yuav qhia yuav ua li cas siv tag nrho cov txheej txheem tau piav qhia saum toj no hauv AWS siv AWS Kinesis thiab AWS EMR. Ua raws li xov xwm!

Tau qhov twg los: www.hab.com

Ntxiv ib saib