Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Tere, Habr! Täna ehitame süsteemi, mis töötleb Spark Streamingi abil Apache Kafka sõnumivooge ja kirjutab töötlemise tulemused AWS RDS-i pilvandmebaasi.

Kujutagem ette, et teatud krediidiasutus seab meile ülesandeks töödelda sissetulevaid tehinguid "lennult" kõigis oma filiaalides. Seda saab teha selleks, et kiiresti välja arvutada riigikassa avatud valuutapositsioon, tehingute limiidid või finantstulemused jne.

Kuidas seda juhtumit rakendada ilma maagiat ja võluloitsu kasutamata – loe lõike alt! Mine!

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga
(Pildi allikas)

Sissejuhatus

Loomulikult pakub suure hulga andmete töötlemine reaalajas rohkelt kasutusvõimalusi tänapäevastes süsteemides. Üks populaarsemaid kombinatsioone selleks on Apache Kafka ja Spark Streaming tandem, kus Kafka loob sissetulevate sõnumipakettide voo ja Spark Streaming töötleb neid pakette etteantud ajaintervalli järel.

Rakenduse veataluvuse suurendamiseks kasutame kontrollpunkte. Selle mehhanismiga, kui Spark Streamingi mootor peab kaotatud andmed taastama, peab ta minema tagasi ainult viimasesse kontrollpunkti ja jätkama sealt arvutusi.

Arendatava süsteemi arhitektuur

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Kasutatud komponendid:

  • Apache Kafka on hajutatud avaldamise ja tellimise sõnumsidesüsteem. Sobib nii võrguühenduseta kui ka võrguühenduseta sõnumite tarbimiseks. Andmete kadumise vältimiseks salvestatakse Kafka sõnumid kettale ja kopeeritakse klastris. Kafka süsteem on üles ehitatud ZooKeeperi sünkroonimisteenusele;
  • Apache Spark Streaming - Sädekomponent voogesituse andmete töötlemiseks. Spark Streaming moodul on ehitatud kasutades mikropartiiarhitektuuri, kus andmevoogu tõlgendatakse väikeste andmepakettide pideva jadana. Spark Streaming võtab andmeid erinevatest allikatest ja ühendab need väikesteks pakettideks. Uusi pakette luuakse korrapäraste ajavahemike järel. Iga ajaintervalli alguses luuakse uus pakett ja kõik selle intervalli jooksul saadud andmed kaasatakse paketti. Intervalli lõpus pakettide kasv peatub. Intervalli suurus määratakse parameetriga, mida nimetatakse partii intervalliks;
  • Apache Spark SQL - ühendab relatsioonitöötluse Sparki funktsionaalse programmeerimisega. Struktureeritud andmed on andmed, millel on skeem, st kõigi kirjete jaoks üks väljade komplekt. Spark SQL toetab sisendit erinevatest struktureeritud andmeallikatest ja tänu skeemiteabe kättesaadavusele saab see tõhusalt hankida ainult nõutavad kirjeväljad ning pakub ka DataFrame API-sid;
  • AWS RDS on suhteliselt odav pilvepõhine relatsiooniandmebaas, veebiteenus, mis lihtsustab seadistamist, kasutamist ja skaleerimist ning mida haldab otse Amazon.

Kafka serveri installimine ja käitamine

Enne Kafka otsest kasutamist peate veenduma, et teil on Java, sest... JVM-i kasutatakse tööks:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Loome Kafkaga töötamiseks uue kasutaja:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Järgmisena laadige distributsioon alla ametlikult Apache Kafka veebisaidilt:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Pakkige allalaaditud arhiiv lahti:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Järgmine samm on valikuline. Fakt on see, et vaikesätted ei võimalda teil kõiki Apache Kafka võimalusi täielikult kasutada. Näiteks kustutage teema, kategooria, grupp, kuhu saab sõnumeid avaldada. Selle muutmiseks redigeerime konfiguratsioonifaili:

vim ~/kafka/config/server.properties

Lisage faili lõppu järgmine tekst:

delete.topic.enable = true

Enne Kafka serveri käivitamist peate käivitama ZooKeeperi serveri; kasutame Kafka distributsiooniga kaasas olevat abiskripti:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Kui ZooKeeper on edukalt käivitunud, käivitage Kafka server eraldi terminalis:

bin/kafka-server-start.sh config/server.properties

Teeme uue teema nimega Tehing:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Veenduge, et oleks loodud vajaliku arvu partitsioonide ja replikatsiooniga teema:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Jätame vahele hetked, mil vastloodud teema puhul tootja ja tarbija proovile panevad. Lisateavet sõnumite saatmise ja vastuvõtmise testimise kohta leiate ametlikust dokumentatsioonist - Saatke mõned sõnumid. Liigume edasi Pythonis tootja kirjutamise juurde, kasutades KafkaProduceri API-d.

Tootja kirjutamine

Tootja genereerib juhuslikke andmeid – 100 sõnumit sekundis. Juhuslike andmete all peame silmas sõnastikku, mis koosneb kolmest väljast:

  • Filiaal — krediidiasutuse müügikoha nimi;
  • valuuta — tehingu valuuta;
  • summa — tehingu summa. Summa on positiivne arv, kui see on panga poolt valuuta ost, ja negatiivne arv, kui tegemist on müügiga.

Tootja kood näeb välja selline:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Järgmisena saadame saatmismeetodit kasutades JSON-vormingus sõnumi serverisse, meile vajalikule teemale:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Skripti käivitamisel saame terminalis järgmised teated:

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

See tähendab, et kõik toimib nii, nagu soovisime – produtsent genereerib ja saadab meile vajalikule teemale sõnumeid.
Järgmine samm on Sparki installimine ja selle sõnumivoo töötlemine.

Apache Sparki installimine

Apache Spark on universaalne ja suure jõudlusega kobararvutusplatvorm.

Spark toimib paremini kui MapReduce'i mudeli populaarsed teostused, toetades samal ajal laiemat arvutustüüpide valikut, sealhulgas interaktiivseid päringuid ja vootöötlust. Kiirus mängib olulist rolli suurte andmemahtude töötlemisel, kuna just kiirus võimaldab teil interaktiivselt töötada ilma minuteid või tunde ootamata. Üks Sparki suurimaid tugevusi, mis teeb selle nii kiireks, on võime teha mälusiseseid arvutusi.

See raamistik on kirjutatud Scalas, seega peate selle esmalt installima:

sudo apt-get install scala

Laadige Sparki distributsioon alla ametlikult veebisaidilt:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Pakkige arhiiv lahti:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Lisage bash-faili Sparki tee:

vim ~/.bashrc

Lisage redaktori kaudu järgmised read:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Pärast bashrc muudatuste tegemist käivitage allolev käsk:

source ~/.bashrc

AWS PostgreSQL juurutamine

Jääb vaid juurutada andmebaas, kuhu me voogudest töödeldud teabe üles laadime. Selleks kasutame AWS RDS teenust.

Avage AWS-i konsool -> AWS RDS -> andmebaasid -> loo andmebaas:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Valige PostgreSQL ja klõpsake nuppu Edasi:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Sest See näide on ainult hariduslikel eesmärkidel; kasutame tasuta serverit "vähemalt" (Free Tier):
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Järgmisena paneme linnukese plokki Free Tier ja pärast seda pakutakse meile automaatselt t2.micro klassi eksemplari - kuigi nõrk, on see tasuta ja meie ülesande jaoks üsna sobiv:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Edasi tulevad väga olulised asjad: andmebaasi eksemplari nimi, peakasutaja nimi ja parool. Nimetagem eksemplar: myHabrTest, peakasutaja: habr, parool: habr12345 ja klõpsake nuppu Edasi:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Järgmisel lehel on parameetrid, mis vastutavad meie andmebaasiserveri juurdepääsetavuse eest väljastpoolt (avalik juurdepääsetavus) ja pordi kättesaadavuse eest:

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Loome VPC turvagrupi jaoks uue sätte, mis võimaldab välist juurdepääsu meie andmebaasiserverile pordi 5432 (PostgreSQL) kaudu.
Liigume AWS-i konsooli eraldi brauseriaknas VPC armatuurlauale -> Turvagrupid -> Turvarühma loomine:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Määrasime turvarühma nime - PostgreSQL, kirjelduse, märkige, millise VPC-ga see rühm peaks olema seotud, ja klõpsake nuppu Loo:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Täitke vastloodud rühma jaoks pordi 5432 sissetulemise reeglid, nagu on näidatud alloleval pildil. Porti ei saa käsitsi määrata, vaid vali ripploendist Tüüp PostgreSQL.

Väärtus ::/0 tähendab rangelt võttes kogu maailmast serverisse sissetuleva liikluse kättesaadavust, mis kanooniliselt ei ole päris tõsi, kuid näite analüüsimiseks lubagem kasutada seda lähenemist:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Naaseme brauseri lehele, kus on avatud "Täpsemate sätete seadistamine" ja valige jaotises VPC turvarühmad -> Vali olemasolevad VPC turvarühmad -> PostgreSQL:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Järgmisena määrake jaotises Andmebaasi valikud -> Andmebaasi nimi -> nimi - habrDB.

Ülejäänud parameetrid, välja arvatud varundamise keelamine (varunduse säilitamise periood - 0 päeva), jälgimise ja Performance Insightsi, saame vaikimisi jätta. Klõpsake nuppu Loo andmebaas:
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Keermekäsitleja

Viimases etapis töötatakse välja Spark töö, mis töötleb iga kahe sekundi tagant Kafkalt tulevaid uusi andmeid ja sisestab tulemuse andmebaasi.

Nagu eespool märgitud, on kontrollpunktid SparkStreamingu põhimehhanism, mis tuleb tõrketaluvuse tagamiseks konfigureerida. Kasutame kontrollpunkte ja kui protseduur ebaõnnestub, peab Spark Streaming moodul naasma vaid viimasesse kontrollpunkti ja jätkama sealt arvutusi, et taastada kaotatud andmed.

Kontrollpunkti saab lubada, seadistades tõrketaluvas ja usaldusväärses failisüsteemis (nt HDFS, S3 jne) kataloogi, kuhu kontrollpunkti teave salvestatakse. Seda tehakse kasutades näiteks:

streamingContext.checkpoint(checkpointDirectory)

Meie näites kasutame järgmist lähenemist, nimelt kui checkpointDirectory on olemas, siis luuakse kontrollpunkti andmetest kontekst uuesti. Kui kataloogi pole olemas (st käivitatakse esimest korda), kutsutakse funktsiooni functionToCreateContext, et luua uus kontekst ja konfigureerida DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Loome DirectStreami objekti, et ühenduda "tehingu" teemaga, kasutades KafkaUtilsi teegi createDirectStream meetodit:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Sissetulevate andmete sõelumine JSON-vormingus:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL-i abil teeme lihtsa rühmitamise ja kuvame tulemuse konsoolis:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Päringu teksti hankimine ja selle käitamine Spark SQL-i kaudu:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Seejärel salvestame saadud koondandmed AWS RDS-i tabelisse. Koondamistulemuste salvestamiseks andmebaasi tabelisse kasutame DataFrame'i objekti kirjutamismeetodit:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Paar sõna AWS RDS-iga ühenduse loomise kohta. Lõime selle jaoks kasutaja ja parooli etapis „AWS PostgreSQL juurutamine”. Peaksite kasutama Endpointi andmebaasiserveri URL-ina, mis kuvatakse jaotises Ühenduvus ja turvalisus:

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Sparki ja Kafka õigeks ühendamiseks peaksite töö käivitama smark-submit kaudu, kasutades artefakti spark-streaming-kafka-0-8_2.11. Lisaks kasutame PostgreSQL-i andmebaasiga suhtlemiseks ka artefakti, edastame need pakettide kaudu.

Skripti paindlikkuse huvides lisame sisendparameetritena ka sõnumiserveri nime ja teema, kust andmeid soovime saada.

Niisiis, on aeg käivitada ja kontrollida süsteemi funktsionaalsust:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Kõik õnnestus! Nagu näete alloleval pildil, väljastatakse rakenduse töötamise ajal uued koondtulemused iga 2 sekundi järel, kuna seadsime StreamingContext objekti loomisel partiimise intervalliks 2 sekundit:

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Järgmisena teeme andmebaasi lihtsa päringu, et kontrollida kirjete olemasolu tabelis tehingu_voog:

Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga

Järeldus

Selles artiklis vaadeldi näidet teabe vootöötlusest, kasutades Spark Streamingut koos Apache Kafka ja PostgreSQL-iga. Erinevatest allikatest pärit andmete arvu kasvuga on raske ülehinnata Spark Streamingu praktilist väärtust voogesituse ja reaalajas rakenduste loomisel.

Täieliku lähtekoodi leiate minu hoidlast aadressil GitHub.

Mul on hea meel seda artiklit arutada, ootan teie kommentaare ja loodan ka kõigi hoolivate lugejate konstruktiivset kriitikat.

Soovin teile edu!

Ps. Algselt oli plaanis kasutada kohalikku PostgreSQL andmebaasi, kuid arvestades minu armastust AWS-i vastu, otsustasin andmebaasi pilve kolida. Järgmises selleteemalises artiklis näitan, kuidas rakendada kogu ülalkirjeldatud süsteemi AWS-is, kasutades AWS Kinesis ja AWS EMR. Jälgi uudiseid!

Allikas: www.habr.com

Lisa kommentaar