Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Halló, Habr! Í dag munum við byggja kerfi sem mun vinna úr Apache Kafka skilaboðastraumum með því að nota Spark Streaming og skrifa vinnsluniðurstöðurnar í AWS RDS skýjagagnagrunninn.

Við skulum ímynda okkur að ákveðin lánastofnun setji okkur það verkefni að vinna komandi færslur „í flugi“ í öllum útibúum sínum. Þetta er hægt að gera í þeim tilgangi að reikna tafarlaust út opna gjaldeyrisstöðu fyrir ríkissjóð, takmarkanir eða afkomuviðskipti o.fl.

Hvernig á að útfæra þetta mál án þess að nota töfra og galdra - lestu undir skurðinum! Farðu!

Apache Kafka og streymandi gagnavinnsla með Spark Streaming
(Myndheimild)

Inngangur

Að sjálfsögðu veitir vinnsla á miklu magni gagna í rauntíma næg tækifæri til notkunar í nútímakerfum. Ein vinsælasta samsetningin fyrir þetta er samhliða Apache Kafka og Spark Streaming, þar sem Kafka býr til straum af komandi skilaboðapökkum og Spark Streaming vinnur þessa pakka á tilteknu tímabili.

Til að auka bilanaþol forritsins munum við nota eftirlitsstöðvar. Með þessu kerfi, þegar Spark Streaming vélin þarf að endurheimta týnd gögn, þarf hún aðeins að fara aftur á síðasta eftirlitsstað og halda áfram útreikningum þaðan.

Arkitektúr þróaðs kerfis

Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Íhlutir notaðir:

  • Apache Kafka er dreift skilaboðakerfi fyrir birtingu og áskrift. Hentar bæði fyrir skilaboðanotkun án nettengingar og á netinu. Til að koma í veg fyrir gagnatap eru Kafka skilaboð geymd á diski og afrituð innan klasans. Kafka kerfið er byggt ofan á ZooKeeper samstillingarþjónustuna;
  • Apache Spark streymi - Spark hluti til að vinna úr streymisgögnum. Spark Streaming einingin er byggð með því að nota örlotu arkitektúr, þar sem gagnastraumurinn er túlkaður sem samfelld röð lítilla gagnapakka. Spark Streaming tekur gögn frá mismunandi aðilum og sameinar þau í litla pakka. Nýir pakkar eru búnir til með reglulegu millibili. Í upphafi hvers tímabils er nýr pakki búinn til og öll gögn sem berast á því tímabili eru innifalin í pakkanum. Í lok tímabilsins stöðvast pakkavöxtur. Stærð bilsins er ákvörðuð af færibreytu sem kallast lotubil;
  • Apache Spark SQL - sameinar venslavinnslu með Spark hagnýtri forritun. Skipulögð gögn þýðir gögn sem hafa skema, það er eitt sett af reitum fyrir allar færslur. Spark SQL styður inntak frá ýmsum skipulögðum gagnaveitum og, þökk sé tiltækum skemaupplýsingum, getur það á skilvirkan hátt endurheimt aðeins nauðsynlega færslureit, og býður einnig upp á DataFrame API;
  • AWS RDS er tiltölulega ódýr skýjabundinn venslagagnagrunnur, vefþjónusta sem einfaldar uppsetningu, rekstur og stærðarstærð og er stjórnað beint af Amazon.

Að setja upp og keyra Kafka netþjóninn

Áður en þú notar Kafka beint þarftu að ganga úr skugga um að þú sért með Java, því... JVM er notað fyrir vinnu:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Búum til nýjan notanda til að vinna með Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Næst skaltu hlaða niður dreifingunni frá opinberu Apache Kafka vefsíðunni:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Taktu niður skjalasafnið:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Næsta skref er valfrjálst. Staðreyndin er sú að sjálfgefnar stillingar leyfa þér ekki að fullnýta alla möguleika Apache Kafka. Til dæmis, eyða efni, flokki, hópi sem hægt er að birta skilaboð í. Til að breyta þessu skulum við breyta stillingarskránni:

vim ~/kafka/config/server.properties

Bættu eftirfarandi við endann á skránni:

delete.topic.enable = true

Áður en þú byrjar Kafka þjóninn þarftu að ræsa ZooKeeper þjóninn; við munum nota aukaforskriftina sem fylgir Kafka dreifingunni:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Eftir að ZooKeeper hefur byrjað með góðum árangri skaltu ræsa Kafka netþjóninn í sérstakri flugstöð:

bin/kafka-server-start.sh config/server.properties

Búum til nýtt efni sem heitir Færsla:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Við skulum ganga úr skugga um að efni með nauðsynlegum fjölda skiptinga og afritunar hafi verið búið til:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Við skulum missa af augnablikunum við að prófa framleiðandann og neytandann fyrir nýstofnað efni. Frekari upplýsingar um hvernig þú getur prófað að senda og taka á móti skilaboðum eru skrifaðar í opinberu skjölunum - Sendu nokkur skilaboð. Jæja, við höldum áfram að skrifa framleiðanda í Python með KafkaProducer API.

Framleiðandi skrifar

Framleiðandinn mun búa til handahófskennd gögn - 100 skilaboð á hverri sekúndu. Með handahófskenndum gögnum er átt við orðabók sem samanstendur af þremur sviðum:

  • Branch — heiti sölustaðar lánastofnunar;
  • Gjaldmiðill — viðskiptagjaldmiðill;
  • Upphæð — viðskiptaupphæð. Upphæðin verður jákvæð tala ef um er að ræða gjaldeyriskaup bankans og neikvæð tala ef um sölu er að ræða.

Kóðinn fyrir framleiðandann lítur svona út:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Næst, með því að nota sendingaraðferðina, sendum við skilaboð til netþjónsins, á það efni sem við þurfum, á JSON sniði:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Þegar handritið er keyrt fáum við eftirfarandi skilaboð í flugstöðinni:

Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Þetta þýðir að allt virkar eins og við vildum - framleiðandinn býr til og sendir skilaboð á það efni sem við þurfum.
Næsta skref er að setja upp Spark og vinna úr þessum skilaboðastraumi.

Að setja upp Apache Spark

Apache Spark er alhliða og afkastamikill klasatölvuvettvangur.

Spark skilar betri árangri en vinsælar útfærslur á MapReduce líkaninu á sama tíma og hann styður við fjölbreyttari útreikningagerðir, þar á meðal gagnvirkar fyrirspurnir og straumvinnslu. Hraði gegnir mikilvægu hlutverki þegar unnið er úr miklu magni af gögnum, þar sem það er hraði sem gerir þér kleift að vinna gagnvirkt án þess að eyða mínútum eða klukkustundum í að bíða. Einn stærsti styrkur Spark sem gerir hann svo hraðvirkan er hæfileiki þess til að framkvæma útreikninga í minni.

Þessi rammi er skrifaður í Scala, svo þú þarft að setja hann upp fyrst:

sudo apt-get install scala

Sæktu Spark dreifinguna frá opinberu vefsíðunni:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Taktu upp skjalasafnið:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Bættu slóðinni að Spark við bash skrána:

vim ~/.bashrc

Bættu við eftirfarandi línum í gegnum ritstjórann:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Keyrðu skipunina hér að neðan eftir að hafa gert breytingar á bashrc:

source ~/.bashrc

Innleiðir AWS PostgreSQL

Það eina sem er eftir er að dreifa gagnagrunninum sem við munum hlaða upp unnnum upplýsingum úr straumunum í. Til þess munum við nota AWS RDS þjónustuna.

Farðu í AWS stjórnborðið -> AWS RDS -> Gagnagrunnar -> Búðu til gagnagrunn:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Veldu PostgreSQL og smelltu á Next:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Vegna þess að Þetta dæmi er eingöngu í fræðsluskyni; við munum nota ókeypis netþjón „að lágmarki“ (Free Tier):
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Næst setjum við hak í Free Tier blokkina og eftir það verður okkur sjálfkrafa boðið tilvik af t2.micro bekknum - þó það sé veikt, þá er það ókeypis og hentar vel fyrir okkar verkefni:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Næst koma mjög mikilvæg atriði: nafn gagnagrunnsins, nafn aðalnotanda og lykilorð hans. Við skulum nefna dæmið: myHabrTest, aðalnotandi: habr, lykilorð: habr12345 og smelltu á Næsta hnappinn:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Á næstu síðu eru færibreytur sem bera ábyrgð á aðgengi gagnagrunnsþjónsins okkar að utan (almenningsaðgengi) og aðgengi að höfnum:

Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Við skulum búa til nýja stillingu fyrir VPC öryggishópinn, sem mun leyfa ytri aðgang að gagnagrunnsþjóninum okkar í gegnum port 5432 (PostgreSQL).
Við skulum fara í AWS stjórnborðið í sérstökum vafraglugga í VPC stjórnborðið -> Öryggishópar -> Búa til öryggishópshluta:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Við stillum nafnið fyrir öryggishópinn - PostgreSQL, lýsingu, tilgreina hvaða VPC þessi hópur ætti að tengjast og smelltu á Búa til hnappinn:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Fylltu út reglurnar á heimleið fyrir höfn 5432 fyrir nýstofnaðan hóp, eins og sýnt er á myndinni hér að neðan. Þú getur ekki tilgreint gáttina handvirkt, en veldu PostgreSQL úr fellilistanum Tegund.

Strangt til tekið þýðir gildið ::/0 framboð á komandi umferð á netþjóninn frá öllum heimshornum, sem er kanónískt ekki alveg satt, en til að greina dæmið skulum við leyfa okkur að nota þessa nálgun:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Við snúum aftur á vafrasíðuna, þar sem við höfum „Stilla háþróaðar stillingar“ opna og veljum í hlutanum VPC öryggishópar -> Veldu núverandi VPC öryggishópa -> PostgreSQL:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Næst, í gagnagrunnsvalkostunum -> Gagnagrunnsheiti -> stilltu nafnið - habrDB.

Við getum skilið eftir færibreyturnar sem eftir eru, að undanskildum því að slökkva á öryggisafritun (varðveislutími öryggisafrits - 0 dagar), eftirlit og árangursinnsýn, sjálfgefið. Smelltu á hnappinn Búa til gagnagrunn:
Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Þráðamaður

Lokastigið verður þróun Spark-vinnu sem mun vinna úr nýjum gögnum sem koma frá Kafka á tveggja sekúndna fresti og setja niðurstöðuna inn í gagnagrunninn.

Eins og fram kemur hér að ofan eru eftirlitsstöðvar kjarnabúnaður í SparkStreaming sem verður að stilla til að tryggja bilanaþol. Við munum nota eftirlitsstöðvar og ef aðferðin mistekst þarf Spark Streaming einingin aðeins að fara aftur á síðasta eftirlitsstað og halda áfram útreikningum frá honum til að endurheimta týnd gögn.

Hægt er að virkja eftirlitsstöð með því að setja möppu á bilunarþolnu, áreiðanlegu skráarkerfi (svo sem HDFS, S3, osfrv.) þar sem upplýsingar um eftirlitsstað verða geymdar. Þetta er gert með því að nota til dæmis:

streamingContext.checkpoint(checkpointDirectory)

Í dæminu okkar munum við nota eftirfarandi nálgun, nefnilega ef checkpointDirectory er til, þá verður samhengið endurskapað úr checkpoint gögnunum. Ef skráin er ekki til (þ.e. keyrð í fyrsta skipti), þá er functionToCreateContext kallað til að búa til nýtt samhengi og stilla DSreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Við búum til DirectStream hlut til að tengjast „færslu“ efninu með því að nota createDirectStream aðferð KafkaUtils bókasafnsins:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Að þátta gögn sem berast á JSON sniði:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Með því að nota Spark SQL gerum við einfalda flokkun og birtum niðurstöðuna í stjórnborðinu:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Að fá fyrirspurnartextann og keyra hann í gegnum Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Og svo vistum við samanlögð gögn sem myndast í töflu í AWS RDS. Til að vista niðursöfnunarniðurstöðurnar í gagnagrunnstöflu munum við nota skrifaaðferðina á DataFrame hlutnum:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Nokkur orð um að setja upp tengingu við AWS RDS. Við bjuggum til notandann og lykilorðið fyrir það í „Deploying AWS PostgreSQL“ skrefinu. Þú ættir að nota Endpoint sem vefslóð gagnagrunnsþjónsins, sem birtist í Tengingar og öryggi hlutanum:

Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Til þess að tengja Spark og Kafka rétt, ættir þú að keyra verkið með smark-submit með því að nota artifact neistastreymi-kafka-0-8_2.11. Að auki munum við einnig nota grip til að hafa samskipti við PostgreSQL gagnagrunninn; við munum flytja þá í gegnum --pakka.

Fyrir sveigjanleika handritsins munum við einnig setja inn sem innsláttarfæribreytur nafn skilaboðaþjónsins og efnið sem við viljum fá gögn frá.

Svo það er kominn tími til að ræsa og athuga virkni kerfisins:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Allt gekk upp! Eins og þú sérð á myndinni hér að neðan, á meðan forritið er í gangi, eru nýjar samsöfnunarniðurstöður birtar á 2 sekúndna fresti, vegna þess að við stilltum lotutímabilið á 2 sekúndur þegar við bjuggum til StreamingContext hlutinn:

Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Næst gerum við einfalda fyrirspurn í gagnagrunninn til að athuga hvort færslur séu í töflunni færsluflæði:

Apache Kafka og streymandi gagnavinnsla með Spark Streaming

Ályktun

Þessi grein skoðaði dæmi um straumvinnslu upplýsinga með því að nota Spark Streaming í tengslum við Apache Kafka og PostgreSQL. Með vexti gagna frá ýmsum aðilum er erfitt að ofmeta hagnýtt gildi Spark Streaming til að búa til streymi og rauntímaforrit.

Þú getur fundið frumkóðann í heild sinni í geymslunni minni á GitHub.

Ég er fús til að ræða þessa grein, ég hlakka til athugasemda þinna og ég vona líka eftir uppbyggilegri gagnrýni frá öllum umhyggjusamum lesendum.

Ég óska ​​þér velgengni!

Ps. Upphaflega var áætlað að nota staðbundinn PostgreSQL gagnagrunn, en í ljósi ást minnar á AWS ákvað ég að færa gagnagrunninn yfir í skýið. Í næstu grein um þetta efni mun ég sýna hvernig á að innleiða allt kerfið sem lýst er hér að ofan í AWS með AWS Kinesis og AWS EMR. Fylgstu með fréttum!

Heimild: www.habr.com

Bæta við athugasemd