ProHoster > Blogi > antaminen > Apache Kafka ja Streaming Data Processing with Spark Streaming
Apache Kafka ja Streaming Data Processing with Spark Streaming
Hei, Habr! Tänään rakennamme järjestelmän, joka käsittelee Apache Kafka -viestivirtoja Spark Streamingin avulla ja kirjoittaa käsittelytulokset AWS RDS -pilvitietokantaan.
Kuvitellaan, että tietty luottolaitos asettaa meille tehtävän käsitellä saapuvat tapahtumat "lennossa" kaikissa sen konttoreissa. Tämä voidaan tehdä, jotta voidaan nopeasti laskea avoin valuuttapositio treasurylle, limiitit tai transaktioiden taloudelliset tulokset jne.
Kuinka toteuttaa tämä tapaus ilman taikuutta ja taikuutta - lue leikkauksen alta! Mennä!
Tietenkin suuren tietomäärän käsittely reaaliajassa tarjoaa runsaasti käyttömahdollisuuksia nykyaikaisissa järjestelmissä. Yksi suosituimmista yhdistelmistä tähän on Apache Kafkan ja Spark Streamingin tandem, jossa Kafka luo virran saapuvista viestipaketteista ja Spark Streaming käsittelee nämä paketit tietyin aikavälein.
Käytämme tarkistuspisteitä lisätäksemme sovelluksen vikasietoisuutta. Tämän mekanismin avulla, kun Spark Streaming -moottorin on palautettava kadonneet tiedot, sen tarvitsee vain palata viimeiseen tarkistuspisteeseen ja jatkaa laskelmia sieltä.
Kehitetyn järjestelmän arkkitehtuuri
Käytetyt komponentit:
Apache Kafka on hajautettu julkaisu-tilaa -viestijärjestelmä. Sopii sekä offline- että online-viestien kulutukseen. Tietojen katoamisen estämiseksi Kafka-viestit tallennetaan levylle ja kopioidaan klusterin sisällä. Kafka-järjestelmä on rakennettu ZooKeeper-synkronointipalvelun päälle;
Apache Spark -suoratoisto - Spark-komponentti suoratoistodatan käsittelyyn. Spark Streaming -moduuli on rakennettu käyttämällä mikroeräarkkitehtuuria, jossa datavirta tulkitaan jatkuvana pienten datapakettien sarjana. Spark Streaming ottaa dataa eri lähteistä ja yhdistää ne pieniksi paketeiksi. Uusia paketteja luodaan säännöllisin väliajoin. Kunkin aikavälin alussa luodaan uusi paketti, ja kaikki kyseisen ajanjakson aikana vastaanotettu data sisällytetään pakettiin. Intervallin lopussa pakettien kasvu pysähtyy. Intervallin koko määräytyy parametrilla, jota kutsutaan eräväliksi;
Apache Spark SQL - yhdistää relaatiokäsittelyn Spark-toiminnalliseen ohjelmointiin. Strukturoidulla tiedolla tarkoitetaan dataa, jolla on skeema, eli yksi kenttäjoukko kaikille tietueille. Spark SQL tukee syöttöä useista strukturoiduista tietolähteistä ja skeematietojen saatavuuden ansiosta se voi tehokkaasti hakea vain vaaditut tietuekentät ja tarjoaa myös DataFrame API:t;
AWS RDS on suhteellisen edullinen pilvipohjainen relaatiotietokanta, verkkopalvelu, joka yksinkertaistaa asennusta, käyttöä ja skaalausta ja jota hallinnoi suoraan Amazon.
Kafka-palvelimen asennus ja käyttö
Ennen kuin käytät Kafkaa suoraan, sinun on varmistettava, että sinulla on Java, koska... JVM:ää käytetään työhön:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Seuraava vaihe on valinnainen. Tosiasia on, että oletusasetukset eivät anna sinun käyttää kaikkia Apache Kafkan ominaisuuksia. Poista esimerkiksi aihe, luokka tai ryhmä, jolle viestejä voidaan julkaista. Muuttaaksesi tämän, muokataan asetustiedostoa:
vim ~/kafka/config/server.properties
Lisää tiedoston loppuun seuraava:
delete.topic.enable = true
Ennen kuin käynnistät Kafka-palvelimen, sinun on käynnistettävä ZooKeeper-palvelin; käytämme Kafka-jakelun mukana tulevaa apuohjelmaa:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Kun ZooKeeper on käynnistynyt onnistuneesti, käynnistä Kafka-palvelin erillisessä terminaalissa:
Jätetään väliin tuottajan ja kuluttajan testaushetket juuri luodun aiheen suhteen. Lisätietoja viestien lähettämisen ja vastaanottamisen testaamisesta on kirjoitettu virallisessa dokumentaatiossa - Lähetä joitain viestejä. No, siirrymme tuottajan kirjoittamiseen Pythonissa KafkaProducer API:n avulla.
Tuottajan kirjoittaminen
Tuottaja tuottaa satunnaista dataa - 100 viestiä sekunnissa. Satunnaisilla tiedoilla tarkoitamme sanakirjaa, joka koostuu kolmesta kentästä:
Sivuliike — luottolaitoksen myyntipisteen nimi;
valuutta — tapahtuman valuutta;
Määrä - Siirtosumma. Summa on positiivinen luku, jos kyseessä on pankin valuutan osto, ja negatiivinen luku, jos kyseessä on myynti.
Seuraavaksi lähetämme lähetysmenetelmällä viestin palvelimelle, tarvitsemamme aiheeseen, JSON-muodossa:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Kun suoritamme komentosarjaa, saamme seuraavat viestit terminaaliin:
Tämä tarkoittaa, että kaikki toimii niin kuin halusimme – tuottaja luo ja lähettää viestejä tarvitsemamme aiheeseen.
Seuraava vaihe on asentaa Spark ja käsitellä tämä viestivirta.
Apache Sparkin asennus
Apache Spark on yleinen ja tehokas klusterilaskenta-alusta.
Spark toimii paremmin kuin MapReduce-mallin suositut toteutukset ja tukee samalla laajempaa valikoimaa laskentatyyppejä, mukaan lukien interaktiiviset kyselyt ja stream-käsittely. Nopeudella on tärkeä rooli suuria tietomääriä käsiteltäessä, koska se on nopeus, jonka avulla voit työskennellä interaktiivisesti ilman minuutteja tai tunteja odottamiseen. Yksi Sparkin suurimmista vahvuuksista, jotka tekevät siitä niin nopean, on sen kyky suorittaa muistilaskelmia.
Tämä kehys on kirjoitettu Scalassa, joten sinun on asennettava se ensin:
Suorita alla oleva komento, kun olet tehnyt muutoksia bashrc:iin:
source ~/.bashrc
AWS PostgreSQL:n käyttöönotto
Jäljelle jää vain ottaa käyttöön tietokanta, johon lataamme virroista käsitellyt tiedot. Käytämme tähän AWS RDS -palvelua.
Siirry AWS-konsoliin -> AWS RDS -> Tietokannat -> Luo tietokanta:
Valitse PostgreSQL ja napsauta Seuraava:
Koska Tämä esimerkki on tarkoitettu vain opetustarkoituksiin; käytämme ilmaista palvelinta "vähintään" (Free Tier):
Seuraavaksi laitamme rastin Free Tier -lohkoon, ja sen jälkeen meille tarjotaan automaattisesti t2.micro-luokan esiintymä - vaikka se on heikko, se on ilmainen ja sopiva tehtäväämme:
Seuraavaksi tulevat erittäin tärkeät asiat: tietokannan ilmentymän nimi, pääkäyttäjän nimi ja hänen salasanansa. Nimetään ilmentymä: myHabrTest, pääkäyttäjä: habr, Salasana: habr12345 ja napsauta Seuraava-painiketta:
Seuraavalla sivulla on parametrit, jotka vastaavat tietokantapalvelimemme käytettävyydestä ulkopuolelta (Julkinen saavutettavuus) ja porttien saatavuudesta:
Luodaan VPC-suojausryhmälle uusi asetus, joka mahdollistaa ulkoisen pääsyn tietokantapalvelimellemme portin 5432 (PostgreSQL) kautta.
Siirrytään AWS-konsoliin erillisessä selainikkunassa VPC Dashboard -> Suojausryhmät -> Luo suojausryhmä -osioon:
Asetamme suojausryhmän nimen - PostgreSQL, kuvauksen, ilmoitamme mihin VPC:hen tämä ryhmä tulee liittää ja napsauta Luo-painiketta:
Täytä portin 5432 saapuvat säännöt äskettäin luodulle ryhmälle alla olevan kuvan mukaisesti. Et voi määrittää porttia manuaalisesti, vaan valitse avattavasta Type-luettelosta PostgreSQL.
Tarkkaan ottaen arvo ::/0 tarkoittaa saapuvan liikenteen saatavuutta palvelimelle kaikkialta maailmasta, mikä ei kanonisesti ole täysin totta, mutta esimerkin analysoimiseksi sallitaanpa käyttää tätä lähestymistapaa:
Palaamme selainsivulle, jossa on "Määritä lisäasetukset" auki ja valitse VPC-suojausryhmät -> Valitse olemassa olevat VPC-suojausryhmät -> PostgreSQL:
Seuraavaksi aseta tietokannan asetukset -> Tietokannan nimi -> nimi - habrDB.
Voimme jättää muut parametrit, lukuun ottamatta varmuuskopioinnin (varmuuskopion säilytysaika - 0 päivää), seurantaa ja Performance Insights -toimintoja, oletusarvoisesti. Napsauta painiketta Luo tietokanta:
Langan käsittelijä
Viimeisessä vaiheessa kehitetään Spark-työ, joka käsittelee kahden sekunnin välein Kafkasta tulevaa uutta tietoa ja syöttää tuloksen tietokantaan.
Kuten yllä todettiin, tarkistuspisteet ovat SparkStreamingin ydinmekanismi, joka on määritettävä vikasietoisuuden varmistamiseksi. Käytämme tarkistuspisteitä, ja jos toimenpide epäonnistuu, Spark Streaming -moduulin tarvitsee vain palata viimeiseen tarkistuspisteeseen ja jatkaa laskelmia siitä palauttaakseen kadonneet tiedot.
Tarkistuspisteen voi ottaa käyttöön asettamalla vikasietoiseen, luotettavaan tiedostojärjestelmään (kuten HDFS, S3 jne.) hakemiston, johon tarkistuspisteen tiedot tallennetaan. Tämä tehdään käyttämällä esim.
streamingContext.checkpoint(checkpointDirectory)
Esimerkissämme käytämme seuraavaa lähestymistapaa, nimittäin jos checkpointDirectory on olemassa, konteksti luodaan uudelleen tarkistuspistetiedoista. Jos hakemistoa ei ole olemassa (eli suoritetaan ensimmäistä kertaa), functionToCreateContext kutsutaan luomaan uusi konteksti ja määrittämään DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Luomme DirectStream-objektin muodostamaan yhteyden "tapahtuma"-aiheeseen KafkaUtils-kirjaston createDirectStream-menetelmällä:
Spark SQL:n avulla teemme yksinkertaisen ryhmittelyn ja näytämme tuloksen konsolissa:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Kyselytekstin hakeminen ja sen suorittaminen Spark SQL:n kautta:
Ja sitten tallennamme tuloksena saadut aggregoidut tiedot taulukkoon AWS RDS:ssä. Tallentaaksemme koontitulokset tietokantataulukkoon käytämme DataFrame-objektin kirjoitusmenetelmää:
Muutama sana yhteyden muodostamisesta AWS RDS:ään. Loimme sille käyttäjän ja salasanan "AWS PostgreSQL:n käyttöönotto" -vaiheessa. Sinun tulisi käyttää Endpointia tietokantapalvelimen URL-osoitteena, joka näkyy Yhteydet ja suojaus -osiossa:
Jotta Spark ja Kafka voidaan yhdistää oikein, sinun tulee suorittaa työ smark-submitilla käyttämällä artefaktia spark-streaming-kafka-0-8_2.11. Lisäksi käytämme myös artefaktia vuorovaikutuksessa PostgreSQL-tietokannan kanssa, siirrämme ne ---pakettien kautta.
Skriptin joustavuuden vuoksi sisällytämme syöttöparametreiksi myös viestipalvelimen nimen ja aiheen, josta tietoja halutaan vastaanottaa.
Joten on aika käynnistää ja tarkistaa järjestelmän toimivuus:
Kaikki sujui! Kuten alla olevasta kuvasta näkyy, sovelluksen ollessa käynnissä uudet koontitulokset tulostetaan 2 sekunnin välein, koska asetimme eräväliksi 2 sekuntia, kun loimme StreamingContext-objektin:
Seuraavaksi teemme yksinkertaisen kyselyn tietokantaan tarkistaaksemme tietueiden läsnäolon taulukossa tapahtuman_kulku:
Johtopäätös
Tässä artikkelissa tarkasteltiin esimerkkiä tietojen suorakäsittelystä käyttämällä Spark Streamingia yhdessä Apache Kafkan ja PostgreSQL:n kanssa. Eri lähteistä peräisin olevan datan lisääntyessä on vaikea yliarvioida Spark Streamingin käytännön arvoa suoratoisto- ja reaaliaikasovellusten luomisessa.
Löydät koko lähdekoodin arkistostani osoitteessa GitHub.
Keskustelen mielelläni tästä artikkelista, odotan kommenttejasi ja toivon myös rakentavaa kritiikkiä kaikilta välittäviltä lukijoilta.
Toivotan teille menestystä!
Ps. Aluksi oli tarkoitus käyttää paikallista PostgreSQL-tietokantaa, mutta koska rakkauteni AWS:ään, päätin siirtää tietokannan pilveen. Seuraavassa tätä aihetta käsittelevässä artikkelissa näytän, kuinka koko yllä kuvattu järjestelmä toteutetaan AWS:ssä AWS Kinesis- ja AWS EMR:n avulla. Seuraa uutisia!