Apache Kafka ak Streaming Data Processing ak Spark Streaming

Bonjou, Habr! Jodi a nou pral bati yon sistèm ki pral trete kouran mesaj Apache Kafka lè l sèvi avèk Spark Streaming epi ekri rezilta pwosesis yo nan baz done nwaj AWS RDS la.

Ann imajine ke yon sèten enstitisyon kredi mete nou travay nan trete tranzaksyon fèk ap rantre "sou vole" atravè tout branch li yo. Sa a ka fè nan bi pou yo kalkile san pèdi tan yon pozisyon lajan ouvè pou trezò a, limit oswa rezilta finansye pou tranzaksyon, elatriye.

Ki jan yo aplike ka sa a san yo pa itilize nan majik ak òneman majik - li anba koupe a! Ale!

Apache Kafka ak Streaming Data Processing ak Spark Streaming
(Sous imaj)

Entwodiksyon

Natirèlman, trete yon gwo kantite done an tan reyèl bay opòtinite ase pou itilize nan sistèm modèn. Youn nan konbinezon ki pi popilè pou sa a se tandem Apache Kafka ak Spark Streaming, kote Kafka kreye yon kouran nan pake mesaj fèk ap rantre, ak Spark Streaming trete pake sa yo nan yon entèval tan bay.

Pou ogmante tolerans fay aplikasyon an, nou pral sèvi ak baraj. Avèk mekanis sa a, lè motè Spark Streaming la bezwen refè done ki pèdi, li sèlman bezwen tounen nan dènye pòs la epi rezime kalkil apati de la.

Achitekti nan sistèm nan devlope

Apache Kafka ak Streaming Data Processing ak Spark Streaming

Konpozan yo itilize:

  • Apache Kafka se yon distribye pibliye-abònman sistèm mesaj. Apwopriye pou tou de konsomasyon mesaj offline ak sou entènèt. Pou anpeche pèt done, mesaj Kafka yo estoke sou disk epi yo replike nan gwoup la. Sistèm Kafka bati sou sèvis senkronizasyon ZooKeeper;
  • Apache Spark Streaming - Eleman Spark pou trete done difizyon. Modil Spark Streaming la bati lè l sèvi avèk yon achitekti mikwo-pakèt, kote kouran done yo entèprete kòm yon sekans kontinyèl nan ti pake done. Spark Streaming pran done ki soti nan diferan sous epi konbine li nan ti pakè. Nouvo pakè yo kreye nan entèval regilye. Nan kòmansman chak entèval tan, yo kreye yon nouvo pake, epi nenpòt done ki resevwa pandan entèval sa a enkli nan pake a. Nan fen entèval la, kwasans pake sispann. Gwosè entèval la detèmine pa yon paramèt ki rele entèval pakèt la;
  • Apache Spark SQL - konbine pwosesis relasyon ak pwogramasyon fonksyonèl Spark. Done estriktire vle di done ki gen yon chema, sa vle di, yon sèl seri jaden pou tout dosye. Spark SQL sipòte opinyon ki soti nan yon varyete sous done estriktire epi, gras a disponiblite a nan enfòmasyon chema, li ka avèk efikasite rekipere sèlman jaden ki nesesè nan dosye, epi tou li bay API DataFrame;
  • AWS RDS se yon baz done relasyon ki baze sou nwaj relativman chè, sèvis entènèt ki senplifye konfigirasyon, operasyon ak dekale, epi li se Amazon ki administre dirèkteman.

Enstale ak kouri sèvè Kafka la

Anvan w itilize Kafka dirèkteman, ou bezwen asire w ou genyen Java, paske... JVM yo itilize pou travay:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ann kreye yon nouvo itilizatè pou travay ak Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Apre sa, telechaje distribisyon an nan sit entènèt ofisyèl Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Depake achiv telechaje a:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Pwochen etap la se opsyonèl. Reyalite a se ke anviwònman yo default pa pèmèt ou konplètman itilize tout karakteristik yo nan Apache Kafka. Pa egzanp, efase yon sijè, kategori, gwoup kote yo ka pibliye mesaj yo. Pou chanje sa a, ann edite fichye konfigirasyon an:

vim ~/kafka/config/server.properties

Ajoute sa ki annapre yo nan fen dosye a:

delete.topic.enable = true

Anvan ou kòmanse sèvè Kafka a, ou bezwen kòmanse sèvè ZooKeeper la; nou pral sèvi ak script oksilyè ki vini ak distribisyon Kafka la:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Apre ZooKeeper te kòmanse avèk siksè, lanse sèvè Kafka a nan yon tèminal separe:

bin/kafka-server-start.sh config/server.properties

Ann kreye yon nouvo sijè ki rele Tranzaksyon:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ann asire w ke yo te kreye yon sijè ak kantite patisyon ki nesesè yo ak replikasyon:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ak Streaming Data Processing ak Spark Streaming

Ann rate moman teste pwodiktè ak konsomatè yo pou sijè ki fèk kreye a. Plis detay sou fason ou ka teste voye ak resevwa mesaj yo ekri nan dokiman ofisyèl la - Voye kèk mesaj. Oke, nou ale nan ekri yon pwodiktè nan Python lè l sèvi avèk KafkaProducer API la.

Pwodiktè ekri

Pwodiktè a pral jenere done o aza - 100 mesaj chak segonn. Pa done o aza nou vle di yon diksyonè ki gen twa jaden:

  • Branch — non pwen lavant enstitisyon kredi a;
  • Lajan - lajan tranzaksyon;
  • kantite lajan - kantite tranzaksyon an. Kantite lajan an pral yon nimewo pozitif si li se yon achte nan lajan pa Bank la, ak yon nimewo negatif si li se yon vant.

Kòd pou pwodiktè a sanble sa a:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Apre sa, lè l sèvi avèk metòd voye a, nou voye yon mesaj sou sèvè a, nan sijè nou bezwen an, nan fòma JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Lè w ap kouri script la, nou resevwa mesaj sa yo nan tèminal la:

Apache Kafka ak Streaming Data Processing ak Spark Streaming

Sa vle di ke tout bagay ap travay jan nou te vle - pwodiktè a jenere epi voye mesaj sou sijè nou bezwen an.
Pwochen etap la se enstale Spark ak trete kouran mesaj sa a.

Enstale Apache Spark

Apache etensèl se yon platfòm enfòmatik grap inivèsèl ak pèfòmans segondè.

Spark fè pi bon pase enplemantasyon popilè nan modèl MapReduce pandan y ap sipòte yon pakèt kalite kalkil, ki gen ladan demann entèaktif ak pwosesis kouran. Vitès jwe yon wòl enpòtan lè w ap trete gwo kantite done, paske li se vitès ki pèmèt ou travay entèaktif san yo pa depanse minit oswa èdtan ap tann. Youn nan pi gwo fòs Spark ki fè li tèlman vit se kapasite li pou fè kalkil nan memwa.

Fondasyon sa a ekri nan Scala, kidonk ou bezwen enstale li an premye:

sudo apt-get install scala

Telechaje distribisyon Spark la sou sit entènèt ofisyèl la:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Depake achiv la:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ajoute chemen Spark la nan dosye bash la:

vim ~/.bashrc

Ajoute liy sa yo atravè editè a:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Kouri kòmandman ki anba a apre ou fin fè chanjman nan bashrc:

source ~/.bashrc

Deplwaye AWS PostgreSQL

Tout sa ki rete se deplwaye baz done a nan kote nou pral telechaje enfòmasyon yo trete soti nan kouran yo. Pou sa nou pral sèvi ak sèvis AWS RDS la.

Ale nan konsole AWS -> AWS RDS -> Baz done -> Kreye baz done:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Chwazi PostgreSQL epi klike sou Next:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Paske Egzanp sa a se pou rezon edikasyon sèlman; nou pral sèvi ak yon sèvè gratis "nan minimòm" (Niveau gratis):
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Apre sa, nou mete yon tik nan blòk Free Tier, epi apre sa yo pral otomatikman ofri nou yon egzanp nan klas la t2.micro - byenke fèb, li gratis ak byen apwopriye pou travay nou an:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Apre sa, vini bagay ki enpòtan anpil: non an nan egzanp baz done a, non an nan itilizatè a mèt ak modpas li. Ann bay non egzanp lan: myHabrTest, mèt itilizatè: habr, modpas: habr12345 epi klike sou Next bouton an:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Nan pwochen paj la gen paramèt ki responsab pou aksè nan sèvè baz done nou an soti deyò (aksesibilite piblik) ak disponiblite pò:

Apache Kafka ak Streaming Data Processing ak Spark Streaming

Ann kreye yon nouvo anviwònman pou gwoup sekirite VPC a, ki pral pèmèt aksè ekstèn nan sèvè baz done nou an atravè pò 5432 (PostgreSQL).
Ann ale nan konsole AWS la nan yon fenèt navigatè separe nan VPC Dashboard la -> Gwoup Sekirite -> Kreye seksyon gwoup sekirite:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Nou mete yon non pou gwoup Sekirite a - PostgreSQL, yon deskripsyon, endike ki VPC gwoup sa a ta dwe asosye ak klike sou bouton an Kreye:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Ranpli règ Anthrax pou pò 5432 pou gwoup ki fèk kreye a, jan yo montre nan foto ki anba a. Ou pa ka presize pò a manyèlman, men chwazi PostgreSQL nan lis deroulant Kalite.

Fè egzateman pale, valè a ::/0 vle di disponiblite a nan trafik fèk ap rantre nan sèvè a soti nan tout mond lan, ki se kanonikman pa totalman vre, men analize egzanp lan, se pou nou pèmèt nou sèvi ak apwòch sa a:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Nou retounen nan paj navigatè a, kote nou gen "Konfigure paramèt avanse" louvri epi chwazi nan seksyon gwoup sekirite VPC -> Chwazi gwoup sekirite VPC ki deja egziste -> PostgreSQL:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Apre sa, nan opsyon baz done yo -> non baz done -> mete non an - habrDB.

Nou ka kite paramèt ki rete yo, ak eksepsyon de enfimite backup (peryòd retansyon backup - 0 jou), siveyans ak pèfòmans Insights, pa default. Klike sou bouton an Kreye baz done:
Apache Kafka ak Streaming Data Processing ak Spark Streaming

Manadjè fil

Etap final la pral devlopman yon travay Spark, ki pral trete nouvo done ki soti nan Kafka chak de segonn epi antre rezilta a nan baz done a.

Jan nou note pi wo a, pòs yo se yon mekanis debaz nan SparkStreaming ki dwe konfigirasyon pou asire tolerans fay. Nou pral sèvi ak pòs yo epi, si pwosedi a echwe, modil Spark Streaming la pral sèlman bezwen retounen nan dènye pòs yo epi rezime kalkil yo soti nan refè done yo pèdi.

Yo ka pèmèt checkpointing lè w mete yon anyè sou yon sistèm fichye ki toleran ak fyab (tankou HDFS, S3, elatriye) kote yo pral estoke enfòmasyon pòs yo. Sa a se fè lè l sèvi avèk, pou egzanp:

streamingContext.checkpoint(checkpointDirectory)

Nan egzanp nou an, nou pral itilize apwòch sa a, sètadi, si checkpointDirectory egziste, Lè sa a, kontèks la pral rkree nan done yo pòs. Si anyè a pa egziste (sa vle di egzekite pou premye fwa), Lè sa a, yo rele functionToCreateContext pou kreye yon nouvo kontèks ak konfigirasyon DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Nou kreye yon objè DirectStream pou konekte avèk sijè "tranzaksyon an" lè l sèvi avèk metòd createDirectStream bibliyotèk KafkaUtils la:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analize done k ap rantre nan fòma JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Sèvi ak Spark SQL, nou fè yon gwoupman senp epi montre rezilta a nan konsole a:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Jwenn tèks rechèch la epi kouri li atravè Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Apre sa, nou sove done total ki kapab lakòz yo nan yon tablo nan AWS RDS. Pou sove rezilta aggregasyon yo nan yon tab baz done, nou pral itilize metòd ekri nan objè DataFrame la:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Kèk mo sou konfigirasyon yon koneksyon ak AWS RDS. Nou te kreye itilizatè a ak modpas pou li nan etap "Deplwaye AWS PostgreSQL". Ou ta dwe itilize Endpoint kòm URL sèvè baz done a, ki parèt nan seksyon Koneksyon & sekirite:

Apache Kafka ak Streaming Data Processing ak Spark Streaming

Yo nan lòd yo konekte kòrèkteman Spark ak Kafka, ou ta dwe kouri travay la atravè smark-soumèt lè l sèvi avèk asosye a. etensèl-difizyon-kafka-0-8_2.11. Anplis de sa, nou pral sèvi ak yon zafè tou pou kominike avèk baz done PostgreSQL la; nou pral transfere yo atravè --packages.

Pou fleksibilite nan script la, nou pral enkli tou kòm paramèt opinyon non an nan sèvè mesaj la ak sijè ki soti nan ki nou vle resevwa done.

Se konsa, li lè yo lanse ak tcheke fonksyonalite sistèm nan:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Tout bagay te mache byen! Kòm ou ka wè nan foto ki anba a, pandan aplikasyon an ap fonksyone, nouvo rezilta agrégasyon yo ap soti chak 2 segonn, paske nou mete entèval pakèt la a 2 segonn lè nou te kreye objè a StreamingContext:

Apache Kafka ak Streaming Data Processing ak Spark Streaming

Apre sa, nou fè yon rechèch senp nan baz done a pou tcheke prezans dosye nan tablo a tranzaksyon_flow:

Apache Kafka ak Streaming Data Processing ak Spark Streaming

Konklizyon

Atik sa a te gade yon egzanp sou pwosesis kouran enfòmasyon lè l sèvi avèk Spark Streaming an konjonksyon avèk Apache Kafka ak PostgreSQL. Avèk kwasans done ki soti nan divès sous, li difisil pou ègzajere valè pratik Spark Streaming pou kreye aplikasyon difizyon ak an tan reyèl.

Ou ka jwenn tout kòd sous la nan depo mwen an nan GitHub.

Mwen kontan diskite sou atik sa a, mwen tann kòmantè ou yo, epi mwen espere tou pou kritik konstriktif nan men tout lektè ki pran swen.

Mwen swete w siksè!

. Okòmansman, li te planifye pou itilize yon baz done PostgreSQL lokal, men lè m renmen AWS, mwen te deside deplase baz done a nan nwaj la. Nan pwochen atik sou sijè sa a, mwen pral montre kijan pou aplike tout sistèm ki dekri pi wo a nan AWS lè l sèvi avèk AWS Kinesis ak AWS EMR. Swiv nouvèl la!

Sous: www.habr.com

Add nouvo kòmantè