ProHoster > Blogi > Haldamine > Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga
Apache Kafka ja voogesituse andmetöötlus koos Spark Streaminguga
Tere, Habr! Täna ehitame süsteemi, mis töötleb Spark Streamingi abil Apache Kafka sõnumivooge ja kirjutab töötlemise tulemused AWS RDS-i pilvandmebaasi.
Kujutagem ette, et teatud krediidiasutus seab meile ülesandeks töödelda sissetulevaid tehinguid "lennult" kõigis oma filiaalides. Seda saab teha selleks, et kiiresti välja arvutada riigikassa avatud valuutapositsioon, tehingute limiidid või finantstulemused jne.
Kuidas seda juhtumit rakendada ilma maagiat ja võluloitsu kasutamata – loe lõike alt! Mine!
Loomulikult pakub suure hulga andmete töötlemine reaalajas rohkelt kasutusvõimalusi tänapäevastes süsteemides. Üks populaarsemaid kombinatsioone selleks on Apache Kafka ja Spark Streaming tandem, kus Kafka loob sissetulevate sõnumipakettide voo ja Spark Streaming töötleb neid pakette etteantud ajaintervalli järel.
Rakenduse veataluvuse suurendamiseks kasutame kontrollpunkte. Selle mehhanismiga, kui Spark Streamingi mootor peab kaotatud andmed taastama, peab ta minema tagasi ainult viimasesse kontrollpunkti ja jätkama sealt arvutusi.
Arendatava süsteemi arhitektuur
Kasutatud komponendid:
Apache Kafka on hajutatud avaldamise ja tellimise sõnumsidesüsteem. Sobib nii võrguühenduseta kui ka võrguühenduseta sõnumite tarbimiseks. Andmete kadumise vältimiseks salvestatakse Kafka sõnumid kettale ja kopeeritakse klastris. Kafka süsteem on üles ehitatud ZooKeeperi sünkroonimisteenusele;
Apache Spark Streaming - Sädekomponent voogesituse andmete töötlemiseks. Spark Streaming moodul on ehitatud kasutades mikropartiiarhitektuuri, kus andmevoogu tõlgendatakse väikeste andmepakettide pideva jadana. Spark Streaming võtab andmeid erinevatest allikatest ja ühendab need väikesteks pakettideks. Uusi pakette luuakse korrapäraste ajavahemike järel. Iga ajaintervalli alguses luuakse uus pakett ja kõik selle intervalli jooksul saadud andmed kaasatakse paketti. Intervalli lõpus pakettide kasv peatub. Intervalli suurus määratakse parameetriga, mida nimetatakse partii intervalliks;
Apache Spark SQL - ühendab relatsioonitöötluse Sparki funktsionaalse programmeerimisega. Struktureeritud andmed on andmed, millel on skeem, st kõigi kirjete jaoks üks väljade komplekt. Spark SQL toetab sisendit erinevatest struktureeritud andmeallikatest ja tänu skeemiteabe kättesaadavusele saab see tõhusalt hankida ainult nõutavad kirjeväljad ning pakub ka DataFrame API-sid;
AWS RDS on suhteliselt odav pilvepõhine relatsiooniandmebaas, veebiteenus, mis lihtsustab seadistamist, kasutamist ja skaleerimist ning mida haldab otse Amazon.
Kafka serveri installimine ja käitamine
Enne Kafka otsest kasutamist peate veenduma, et teil on Java, sest... JVM-i kasutatakse tööks:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Järgmine samm on valikuline. Fakt on see, et vaikesätted ei võimalda teil kõiki Apache Kafka võimalusi täielikult kasutada. Näiteks kustutage teema, kategooria, grupp, kuhu saab sõnumeid avaldada. Selle muutmiseks redigeerime konfiguratsioonifaili:
vim ~/kafka/config/server.properties
Lisage faili lõppu järgmine tekst:
delete.topic.enable = true
Enne Kafka serveri käivitamist peate käivitama ZooKeeperi serveri; kasutame Kafka distributsiooniga kaasas olevat abiskripti:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Kui ZooKeeper on edukalt käivitunud, käivitage Kafka server eraldi terminalis:
Jätame vahele hetked, mil vastloodud teema puhul tootja ja tarbija proovile panevad. Lisateavet sõnumite saatmise ja vastuvõtmise testimise kohta leiate ametlikust dokumentatsioonist - Saatke mõned sõnumid. Liigume edasi Pythonis tootja kirjutamise juurde, kasutades KafkaProduceri API-d.
Tootja kirjutamine
Tootja genereerib juhuslikke andmeid – 100 sõnumit sekundis. Juhuslike andmete all peame silmas sõnastikku, mis koosneb kolmest väljast:
Filiaal — krediidiasutuse müügikoha nimi;
valuuta — tehingu valuuta;
summa — tehingu summa. Summa on positiivne arv, kui see on panga poolt valuuta ost, ja negatiivne arv, kui tegemist on müügiga.
Järgmisena saadame saatmismeetodit kasutades JSON-vormingus sõnumi serverisse, meile vajalikule teemale:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Skripti käivitamisel saame terminalis järgmised teated:
See tähendab, et kõik toimib nii, nagu soovisime – produtsent genereerib ja saadab meile vajalikule teemale sõnumeid.
Järgmine samm on Sparki installimine ja selle sõnumivoo töötlemine.
Apache Sparki installimine
Apache Spark on universaalne ja suure jõudlusega kobararvutusplatvorm.
Spark toimib paremini kui MapReduce'i mudeli populaarsed teostused, toetades samal ajal laiemat arvutustüüpide valikut, sealhulgas interaktiivseid päringuid ja vootöötlust. Kiirus mängib olulist rolli suurte andmemahtude töötlemisel, kuna just kiirus võimaldab teil interaktiivselt töötada ilma minuteid või tunde ootamata. Üks Sparki suurimaid tugevusi, mis teeb selle nii kiireks, on võime teha mälusiseseid arvutusi.
See raamistik on kirjutatud Scalas, seega peate selle esmalt installima:
sudo apt-get install scala
Laadige Sparki distributsioon alla ametlikult veebisaidilt:
Sest See näide on ainult hariduslikel eesmärkidel; kasutame tasuta serverit "vähemalt" (Free Tier):
Järgmisena paneme linnukese plokki Free Tier ja pärast seda pakutakse meile automaatselt t2.micro klassi eksemplari - kuigi nõrk, on see tasuta ja meie ülesande jaoks üsna sobiv:
Edasi tulevad väga olulised asjad: andmebaasi eksemplari nimi, peakasutaja nimi ja parool. Nimetagem eksemplar: myHabrTest, peakasutaja: habr, parool: habr12345 ja klõpsake nuppu Edasi:
Järgmisel lehel on parameetrid, mis vastutavad meie andmebaasiserveri juurdepääsetavuse eest väljastpoolt (avalik juurdepääsetavus) ja pordi kättesaadavuse eest:
Loome VPC turvagrupi jaoks uue sätte, mis võimaldab välist juurdepääsu meie andmebaasiserverile pordi 5432 (PostgreSQL) kaudu.
Liigume AWS-i konsooli eraldi brauseriaknas VPC armatuurlauale -> Turvagrupid -> Turvarühma loomine:
Määrasime turvarühma nime - PostgreSQL, kirjelduse, märkige, millise VPC-ga see rühm peaks olema seotud, ja klõpsake nuppu Loo:
Täitke vastloodud rühma jaoks pordi 5432 sissetulemise reeglid, nagu on näidatud alloleval pildil. Porti ei saa käsitsi määrata, vaid vali ripploendist Tüüp PostgreSQL.
Väärtus ::/0 tähendab rangelt võttes kogu maailmast serverisse sissetuleva liikluse kättesaadavust, mis kanooniliselt ei ole päris tõsi, kuid näite analüüsimiseks lubagem kasutada seda lähenemist:
Naaseme brauseri lehele, kus on avatud "Täpsemate sätete seadistamine" ja valige jaotises VPC turvarühmad -> Vali olemasolevad VPC turvarühmad -> PostgreSQL:
Järgmisena määrake jaotises Andmebaasi valikud -> Andmebaasi nimi -> nimi - habrDB.
Ülejäänud parameetrid, välja arvatud varundamise keelamine (varunduse säilitamise periood - 0 päeva), jälgimise ja Performance Insightsi, saame vaikimisi jätta. Klõpsake nuppu Loo andmebaas:
Keermekäsitleja
Viimases etapis töötatakse välja Spark töö, mis töötleb iga kahe sekundi tagant Kafkalt tulevaid uusi andmeid ja sisestab tulemuse andmebaasi.
Nagu eespool märgitud, on kontrollpunktid SparkStreamingu põhimehhanism, mis tuleb tõrketaluvuse tagamiseks konfigureerida. Kasutame kontrollpunkte ja kui protseduur ebaõnnestub, peab Spark Streaming moodul naasma vaid viimasesse kontrollpunkti ja jätkama sealt arvutusi, et taastada kaotatud andmed.
Kontrollpunkti saab lubada, seadistades tõrketaluvas ja usaldusväärses failisüsteemis (nt HDFS, S3 jne) kataloogi, kuhu kontrollpunkti teave salvestatakse. Seda tehakse kasutades näiteks:
streamingContext.checkpoint(checkpointDirectory)
Meie näites kasutame järgmist lähenemist, nimelt kui checkpointDirectory on olemas, siis luuakse kontrollpunkti andmetest kontekst uuesti. Kui kataloogi pole olemas (st käivitatakse esimest korda), kutsutakse funktsiooni functionToCreateContext, et luua uus kontekst ja konfigureerida DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Loome DirectStreami objekti, et ühenduda "tehingu" teemaga, kasutades KafkaUtilsi teegi createDirectStream meetodit:
Spark SQL-i abil teeme lihtsa rühmitamise ja kuvame tulemuse konsoolis:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Päringu teksti hankimine ja selle käitamine Spark SQL-i kaudu:
Seejärel salvestame saadud koondandmed AWS RDS-i tabelisse. Koondamistulemuste salvestamiseks andmebaasi tabelisse kasutame DataFrame'i objekti kirjutamismeetodit:
Paar sõna AWS RDS-iga ühenduse loomise kohta. Lõime selle jaoks kasutaja ja parooli etapis „AWS PostgreSQL juurutamine”. Peaksite kasutama Endpointi andmebaasiserveri URL-ina, mis kuvatakse jaotises Ühenduvus ja turvalisus:
Sparki ja Kafka õigeks ühendamiseks peaksite töö käivitama smark-submit kaudu, kasutades artefakti spark-streaming-kafka-0-8_2.11. Lisaks kasutame PostgreSQL-i andmebaasiga suhtlemiseks ka artefakti, edastame need pakettide kaudu.
Skripti paindlikkuse huvides lisame sisendparameetritena ka sõnumiserveri nime ja teema, kust andmeid soovime saada.
Niisiis, on aeg käivitada ja kontrollida süsteemi funktsionaalsust:
Kõik õnnestus! Nagu näete alloleval pildil, väljastatakse rakenduse töötamise ajal uued koondtulemused iga 2 sekundi järel, kuna seadsime StreamingContext objekti loomisel partiimise intervalliks 2 sekundit:
Järgmisena teeme andmebaasi lihtsa päringu, et kontrollida kirjete olemasolu tabelis tehingu_voog:
Järeldus
Selles artiklis vaadeldi näidet teabe vootöötlusest, kasutades Spark Streamingut koos Apache Kafka ja PostgreSQL-iga. Erinevatest allikatest pärit andmete arvu kasvuga on raske ülehinnata Spark Streamingu praktilist väärtust voogesituse ja reaalajas rakenduste loomisel.
Täieliku lähtekoodi leiate minu hoidlast aadressil GitHub.
Mul on hea meel seda artiklit arutada, ootan teie kommentaare ja loodan ka kõigi hoolivate lugejate konstruktiivset kriitikat.
Soovin teile edu!
Ps. Algselt oli plaanis kasutada kohalikku PostgreSQL andmebaasi, kuid arvestades minu armastust AWS-i vastu, otsustasin andmebaasi pilve kolida. Järgmises selleteemalises artiklis näitan, kuidas rakendada kogu ülalkirjeldatud süsteemi AWS-is, kasutades AWS Kinesis ja AWS EMR. Jälgi uudiseid!