Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Moni, Habr! Lero tipanga dongosolo lomwe lidzakonza mitsinje ya mauthenga a Apache Kafka pogwiritsa ntchito Spark Streaming ndikulemba zotsatira zake ku database yamtambo ya AWS RDS.

Tiyerekeze kuti bungwe linalake langongole limatipatsa ntchito yokonza zomwe zikubwera "pa ntchentche" m'nthambi zake zonse. Izi zitha kuchitidwa ndicholinga chowerengera mwachangu momwe ndalama zilili zotseguka kwa Treasury, malire kapena zotsatira zandalama pazogulitsa, ndi zina.

Momwe mungagwiritsire ntchito nkhaniyi popanda kugwiritsa ntchito matsenga ndi matsenga - werengani pansi pa odulidwa! Pitani!

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming
(Magwero azithunzi)

Mau oyamba

Inde, kukonza deta yochuluka mu nthawi yeniyeni kumapereka mwayi wokwanira wogwiritsa ntchito machitidwe amakono. Chimodzi mwazophatikiza zodziwika bwino pa izi ndi tandem ya Apache Kafka ndi Spark Streaming, komwe Kafka imapanga mtsinje wa mapaketi a uthenga omwe akubwera, ndipo Spark Streaming imayendetsa mapaketiwa pakanthawi kochepa.

Kuti tiwonjezere kulolerana kolakwika kwa pulogalamuyo, tidzagwiritsa ntchito ma checkpoints. Ndi makina awa, injini ya Spark Streaming ikafunika kubweza zomwe zatayika, zimangofunika kubwereranso pamalo omaliza ndikuyambiranso kuwerengera kuchokera pamenepo.

Zomangamanga za dongosolo lopangidwa

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Zogwiritsidwa ntchito:

  • Apache Kafka ndi makina otumizira mauthenga ofalitsidwa. Ndioyenera kugwiritsa ntchito mauthenga opanda intaneti komanso pa intaneti. Kuti mupewe kutayika kwa data, mauthenga a Kafka amasungidwa pa disk ndikusinthidwa mkati mwa tsango. Dongosolo la Kafka limamangidwa pamwamba pa ntchito yolumikizana ndi ZooKeeper;
  • Apache Spark Streaming - Chigawo cha Spark pokonza data yosakira. Spark Streaming module imamangidwa pogwiritsa ntchito zomangamanga zazing'ono, pomwe mtsinje wa data umatanthauziridwa ngati ndondomeko yotsatizana ya mapepala ang'onoang'ono a data. Spark Streaming imatenga zambiri kuchokera kuzinthu zosiyanasiyana ndikuziphatikiza m'mapaketi ang'onoang'ono. Maphukusi atsopano amapangidwa nthawi ndi nthawi. Kumayambiriro kwa nthawi iliyonse, paketi yatsopano imapangidwa, ndipo deta iliyonse yolandiridwa panthawiyi imaphatikizidwa mu paketi. Pamapeto pa nthawiyi, kukula kwa paketi kumasiya. Kukula kwa nthawiyo kumatsimikiziridwa ndi chizindikiro chotchedwa batch interval;
  • Apache Spark SQL - amaphatikiza kukonza kwaubale ndi mapulogalamu a Spark. Deta yokhazikika imatanthawuza deta yomwe ili ndi schema, ndiko kuti, magawo amodzi a zolemba zonse. Spark SQL imathandizira zolowa kuchokera kumitundu yosiyanasiyana ya data ndipo, chifukwa cha kupezeka kwa chidziwitso cha schema, imatha kupezanso magawo ofunikira a zolemba, komanso imapereka ma DataFrame API;
  • AWS RDS ndi njira yotsika mtengo yochokera pamtambo yochokera pamtambo, ntchito yapaintaneti yomwe imathandizira kukhazikitsa, kugwira ntchito ndi makulitsidwe, ndipo imayendetsedwa mwachindunji ndi Amazon.

Kukhazikitsa ndi kuyendetsa seva ya Kafka

Musanagwiritse ntchito Kafka mwachindunji, muyenera kuonetsetsa kuti muli ndi Java, chifukwa ... JVM imagwiritsidwa ntchito:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Tiyeni tipange wogwiritsa ntchito watsopano kuti azigwira ntchito ndi Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Kenako, tsitsani kugawa kuchokera patsamba lovomerezeka la Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Tulutsani zakale zomwe zidatsitsidwa:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Chotsatira ndichosankha. Chowonadi ndi chakuti zosintha zosasinthika sizikulolani kuti mugwiritse ntchito mphamvu zonse za Apache Kafka. Mwachitsanzo, chotsani mutu, gulu, gulu lomwe mauthenga angasindikizidwe. Kuti tisinthe izi, tiyeni tisinthe fayilo yosinthira:

vim ~/kafka/config/server.properties

Onjezani zotsatirazi kumapeto kwa fayilo:

delete.topic.enable = true

Musanayambe seva ya Kafka, muyenera kuyambitsa seva ya ZooKeeper; tidzagwiritsa ntchito script yothandizira yomwe imabwera ndi kugawa kwa Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper ikayamba bwino, yambitsani seva ya Kafka mu terminal yosiyana:

bin/kafka-server-start.sh config/server.properties

Tiyeni tipange mutu watsopano wotchedwa Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Tiyeni tiwonetsetse kuti mutu wokhala ndi nambala yofunikira ya magawo ndi kubwereza wapangidwa:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Tiyeni tiphonye nthawi yoyesa wopanga ndi wogula pa mutu womwe wangopangidwa kumene. Zambiri za momwe mungayesere kutumiza ndi kulandira mauthenga zalembedwa muzolemba zovomerezeka - Tumizani mauthenga ena. Chabwino, timapitiriza kulemba wopanga ku Python pogwiritsa ntchito KafkaProducer API.

Wopanga kulemba

Wopanga apanga zidziwitso zachisawawa - mauthenga 100 sekondi iliyonse. Mwachisawawa tikutanthauza mtanthauzira mawu wokhala ndi magawo atatu:

  • nthambi - dzina la malo ogulitsa ngongole;
  • ndalama - mtengo wamalonda;
  • kuchuluka - kuchuluka kwa malonda. Ndalamayi idzakhala nambala yabwino ngati ikugulidwa kwa ndalama ndi Bank, ndi nambala yolakwika ngati ikugulitsa.

Code ya wopanga ikuwoneka motere:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Kenako, pogwiritsa ntchito njira yotumizira, timatumiza uthenga ku seva, kumutu womwe tikufuna, mumtundu wa JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Tikamayendetsa script, timalandira mauthenga otsatirawa mu terminal:

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Izi zikutanthauza kuti chilichonse chimagwira ntchito momwe timafunira - wopanga amapanga ndikutumiza mauthenga kumutu womwe tikufuna.
Chotsatira ndikuyika Spark ndikukonza mtsinje wa uthengawu.

Kuyika Apache Spark

Apache Spark ndi nsanja yapadziko lonse lapansi komanso yochita bwino kwambiri.

Spark imagwira ntchito bwino kuposa momwe anthu ambiri amagwiritsira ntchito mtundu wa MapReduce kwinaku akuthandizira mitundu yosiyanasiyana yowerengera, kuphatikiza mafunso ochezerana komanso kukonza mayendedwe. Kuthamanga kumagwira ntchito yofunika kwambiri pokonza zambiri, chifukwa ndi liwiro lomwe limakupatsani mwayi woti mugwire ntchito molumikizana popanda kuwononga mphindi kapena maola akudikirira. Chimodzi mwazinthu zazikulu zomwe zimapangitsa Spark kukhala wothamanga kwambiri ndikutha kuwerengera kukumbukira.

Ndondomekoyi idalembedwa ku Scala, chifukwa chake muyenera kuyiyika kaye:

sudo apt-get install scala

Tsitsani kugawa kwa Spark kuchokera patsamba lovomerezeka:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Chotsani zakale:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Onjezani njira yopita ku Spark ku fayilo ya bash:

vim ~/.bashrc

Onjezani mizere iyi kudzera mwa mkonzi:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Thamangani lamulo ili pansipa mutatha kusintha ku bashrc:

source ~/.bashrc

Kutumiza kwa AWS PostgreSQL

Zomwe zatsala ndikuyika database momwe timayikamo zomwe zakonzedwa kuchokera pamitsinje. Pachifukwa ichi tidzagwiritsa ntchito ntchito ya AWS RDS.

Pitani ku AWS console -> AWS RDS -> Databases -> Pangani database:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Sankhani PostgreSQL ndikudina Kenako:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Chifukwa Chitsanzo ichi ndi cha maphunziro okha; tidzagwiritsa ntchito seva yaulere "osachepera" (Free Tier):
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Kenako, timayika tiki mu chipika cha Free Tier, ndipo pambuyo pake tidzapatsidwa chitsanzo cha t2.micro class - ngakhale yofooka, ndi yaulere komanso yoyenera ntchito yathu:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Kenako pamabwera zinthu zofunika kwambiri: dzina lachitsanzo cha database, dzina la wogwiritsa ntchito wamkulu ndi mawu ake achinsinsi. Tiyeni titchule chitsanzo: myHabrTest, wogwiritsa ntchito: hab, chinsinsi: habr12345 ndikudina batani Lotsatira:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Patsamba lotsatira pali magawo omwe ali ndi udindo wopezeka kwa seva yathu ya database kuchokera kunja (Kufikira kwa anthu) ndi kupezeka kwa doko:

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Tiyeni tipange malo atsopano a gulu lachitetezo la VPC, lomwe litilola kuti tipeze seva yathu ya database kudzera pa port 5432 (PostgreSQL).
Tiyeni tipite ku kontrakitala ya AWS pazenera lapadera la msakatuli kupita ku VPC Dashboard -> Magulu Otetezedwa -> Pangani gulu lachitetezo:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Tidayika dzina la gulu la Chitetezo - PostgreSQL, kufotokozera, kuwonetsa VPC yomwe gululi liyenera kulumikizidwa nalo ndikudina batani Pangani:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Lembani malamulo olowera pa doko 5432 pagulu lomwe langopangidwa kumene, monga momwe chithunzi chili pansipa. Simungathe kufotokoza doko pamanja, koma sankhani PostgreSQL kuchokera pamndandanda wotsitsa wa Type.

Kunena zowona, mtengo ::/0 umatanthauza kupezeka kwa magalimoto obwera ku seva kuchokera padziko lonse lapansi, zomwe sizowona kwenikweni, koma kusanthula chitsanzo, tiyeni tidzilole kugwiritsa ntchito njirayi:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Timabwerera patsamba la osatsegula, pomwe tili ndi "Sinthani zosintha zapamwamba" ndikutsegula ndikusankha gawo lamagulu achitetezo a VPC -> Sankhani magulu achitetezo a VPC omwe alipo -> PostgreSQL:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Chotsatira, muzosankha za Database -> Dzina la database -> ikani dzina - habrDB.

Titha kusiya magawo otsala, kupatula kuletsa zosunga zobwezeretsera (nthawi yosunga zosunga zobwezeretsera - masiku 0), kuyang'anira ndi Performance Insights, mwachisawawa. Dinani pa batani Pangani database:
Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Wothandizira ulusi

Gawo lomaliza lidzakhala chitukuko cha ntchito ya Spark, yomwe idzakonza deta yatsopano kuchokera ku Kafka masekondi awiri aliwonse ndikulowetsa zotsatira zake mu database.

Monga tafotokozera pamwambapa, zowunikira ndi njira yayikulu mu SparkStreaming yomwe imayenera kukonzedwa kuti iwonetsetse kulolerana. Tidzagwiritsa ntchito cheke ndipo, ngati njirayo ikalephera, gawo la Spark Streaming lingofunika kubwerera kumalo omaliza ndikuyambiranso kuwerengera kuti mubwezeretse zomwe zidatayika.

Kuyang'ana kungathe kuthandizidwa mwa kukhazikitsa chikwatu pa fayilo yodalirika, yodalirika (monga HDFS, S3, etc.) momwe chidziwitso cha checkpoint chidzasungidwa. Izi zimachitika pogwiritsa ntchito, mwachitsanzo:

streamingContext.checkpoint(checkpointDirectory)

Muchitsanzo chathu, tidzagwiritsa ntchito njira yotsatirayi, yomwe ndi, ngati checkpointDirectory ilipo, ndiye kuti nkhaniyo idzasinthidwanso kuchokera ku data ya checkpoint. Ngati bukhuli kulibe (ie kuchitidwa koyamba), ndiye kuti functionToCreateContext imatchedwa kuti ipange nkhani yatsopano ndikukonza DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Timapanga chinthu cha DirectStream kuti chigwirizane ndi mutu wa "transaction" pogwiritsa ntchito njira ya DesignDirectStream laibulale ya KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Kuyika deta mumtundu wa JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Pogwiritsa ntchito Spark SQL, timapanga gulu losavuta ndikuwonetsa zotsatira zake mu console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Kupeza mawu amafunso ndikuyendetsa kudzera pa Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Kenako timasunga zomwe zaphatikizidwazo patebulo mu AWS RDS. Kuti tisunge zotsatira zophatikiza pa tebulo la database, tidzagwiritsa ntchito njira yolembera ya chinthu cha DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Mawu ochepa okhudza kukhazikitsa kulumikizana ndi AWS RDS. Tidapanga wosuta ndi mawu achinsinsi pagawo la "Deploying AWS PostgreSQL". Muyenera kugwiritsa ntchito Endpoint ngati url ya seva ya database, yomwe ikuwonetsedwa mu Kulumikizana & gawo lachitetezo:

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Kuti mulumikizane bwino Spark ndi Kafka, muyenera kuyendetsa ntchitoyi kudzera pa smark-submit pogwiritsa ntchito chojambulacho. spark-streaming-kafka-0-8_2.11. Kuphatikiza apo, tidzagwiritsanso ntchito chojambula polumikizana ndi nkhokwe ya PostgreSQL; tidzawasamutsa kudzera --packages.

Kuti script ikhale yosinthika, tidzaphatikizanso monga magawo olowetsamo dzina la seva ya uthenga ndi mutu womwe tikufuna kulandira deta.

Chifukwa chake, ndi nthawi yoti muyambe ndikuyang'ana momwe dongosololi likugwirira ntchito:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Zonse zinayenda bwino! Monga mukuwonera pachithunzichi, pomwe pulogalamuyo ikugwira ntchito, zotsatira zophatikiza zatsopano zimatuluka masekondi a 2 aliwonse, chifukwa timayika nthawi ya batching kukhala masekondi a 2 pomwe tidapanga chinthu cha StreamingContext:

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Kenako, timapanga funso losavuta ku database kuti tiwone kupezeka kwa zolemba patebulo transaction_flow:

Apache Kafka ndi Streaming Data Processing ndi Spark Streaming

Pomaliza

Nkhaniyi idayang'ana chitsanzo cha kusuntha kwa chidziwitso pogwiritsa ntchito Spark Streaming molumikizana ndi Apache Kafka ndi PostgreSQL. Ndi kukula kwa data kuchokera kumagwero osiyanasiyana, ndizovuta kupitilira mtengo wofunikira wa Spark Streaming pakupanga kutsatsa komanso kugwiritsa ntchito nthawi yeniyeni.

Mutha kupeza gwero lathunthu munkhokwe yanga GitHub.

Ndine wokondwa kukambirana nkhaniyi, ndikuyembekezera ndemanga zanu, komanso ndikuyembekeza kutsutsidwa kolimbikitsa kuchokera kwa owerenga onse osamala.

Ndikufunirani zabwino!

Sal. Poyamba zidakonzedwa kuti ndigwiritse ntchito nkhokwe ya PostgreSQL, koma chifukwa cha chikondi changa cha AWS, ndinaganiza zosunthira nkhokwe kumtambo. M'nkhani yotsatira pamutuwu, ndikuwonetsa momwe ndingagwiritsire ntchito dongosolo lonse lomwe lafotokozedwa pamwambapa mu AWS pogwiritsa ntchito AWS Kinesis ndi AWS EMR. Tsatirani nkhani!

Source: www.habr.com

Kuwonjezera ndemanga