Apache Kafka ja Streaming Data Processing with Spark Streaming

Hei, Habr! Tänään rakennamme järjestelmän, joka käsittelee Apache Kafka -viestivirtoja Spark Streamingin avulla ja kirjoittaa käsittelytulokset AWS RDS -pilvitietokantaan.

Kuvitellaan, että tietty luottolaitos asettaa meille tehtävän käsitellä saapuvat tapahtumat "lennossa" kaikissa sen konttoreissa. Tämä voidaan tehdä, jotta voidaan nopeasti laskea avoin valuuttapositio treasurylle, limiitit tai transaktioiden taloudelliset tulokset jne.

Kuinka toteuttaa tämä tapaus ilman taikuutta ja taikuutta - lue leikkauksen alta! Mennä!

Apache Kafka ja Streaming Data Processing with Spark Streaming
(Kuvan lähde)

Esittely

Tietenkin suuren tietomäärän käsittely reaaliajassa tarjoaa runsaasti käyttömahdollisuuksia nykyaikaisissa järjestelmissä. Yksi suosituimmista yhdistelmistä tähän on Apache Kafkan ja Spark Streamingin tandem, jossa Kafka luo virran saapuvista viestipaketteista ja Spark Streaming käsittelee nämä paketit tietyin aikavälein.

Käytämme tarkistuspisteitä lisätäksemme sovelluksen vikasietoisuutta. Tämän mekanismin avulla, kun Spark Streaming -moottorin on palautettava kadonneet tiedot, sen tarvitsee vain palata viimeiseen tarkistuspisteeseen ja jatkaa laskelmia sieltä.

Kehitetyn järjestelmän arkkitehtuuri

Apache Kafka ja Streaming Data Processing with Spark Streaming

Käytetyt komponentit:

  • Apache Kafka on hajautettu julkaisu-tilaa -viestijärjestelmä. Sopii sekä offline- että online-viestien kulutukseen. Tietojen katoamisen estämiseksi Kafka-viestit tallennetaan levylle ja kopioidaan klusterin sisällä. Kafka-järjestelmä on rakennettu ZooKeeper-synkronointipalvelun päälle;
  • Apache Spark -suoratoisto - Spark-komponentti suoratoistodatan käsittelyyn. Spark Streaming -moduuli on rakennettu käyttämällä mikroeräarkkitehtuuria, jossa datavirta tulkitaan jatkuvana pienten datapakettien sarjana. Spark Streaming ottaa dataa eri lähteistä ja yhdistää ne pieniksi paketeiksi. Uusia paketteja luodaan säännöllisin väliajoin. Kunkin aikavälin alussa luodaan uusi paketti, ja kaikki kyseisen ajanjakson aikana vastaanotettu data sisällytetään pakettiin. Intervallin lopussa pakettien kasvu pysähtyy. Intervallin koko määräytyy parametrilla, jota kutsutaan eräväliksi;
  • Apache Spark SQL - yhdistää relaatiokäsittelyn Spark-toiminnalliseen ohjelmointiin. Strukturoidulla tiedolla tarkoitetaan dataa, jolla on skeema, eli yksi kenttäjoukko kaikille tietueille. Spark SQL tukee syöttöä useista strukturoiduista tietolähteistä ja skeematietojen saatavuuden ansiosta se voi tehokkaasti hakea vain vaaditut tietuekentät ja tarjoaa myös DataFrame API:t;
  • AWS RDS on suhteellisen edullinen pilvipohjainen relaatiotietokanta, verkkopalvelu, joka yksinkertaistaa asennusta, käyttöä ja skaalausta ja jota hallinnoi suoraan Amazon.

Kafka-palvelimen asennus ja käyttö

Ennen kuin käytät Kafkaa suoraan, sinun on varmistettava, että sinulla on Java, koska... JVM:ää käytetään työhön:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Luodaan uusi käyttäjä työskentelemään Kafkan kanssa:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Lataa seuraavaksi jakelu Apache Kafkan viralliselta verkkosivustolta:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Pura ladattu arkisto:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Seuraava vaihe on valinnainen. Tosiasia on, että oletusasetukset eivät anna sinun käyttää kaikkia Apache Kafkan ominaisuuksia. Poista esimerkiksi aihe, luokka tai ryhmä, jolle viestejä voidaan julkaista. Muuttaaksesi tämän, muokataan asetustiedostoa:

vim ~/kafka/config/server.properties

Lisää tiedoston loppuun seuraava:

delete.topic.enable = true

Ennen kuin käynnistät Kafka-palvelimen, sinun on käynnistettävä ZooKeeper-palvelin; käytämme Kafka-jakelun mukana tulevaa apuohjelmaa:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Kun ZooKeeper on käynnistynyt onnistuneesti, käynnistä Kafka-palvelin erillisessä terminaalissa:

bin/kafka-server-start.sh config/server.properties

Luodaan uusi aihe nimeltä Tapahtuma:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Varmistetaan, että aihe, jossa on tarvittava määrä osioita ja replikaatiota, on luotu:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ja Streaming Data Processing with Spark Streaming

Jätetään väliin tuottajan ja kuluttajan testaushetket juuri luodun aiheen suhteen. Lisätietoja viestien lähettämisen ja vastaanottamisen testaamisesta on kirjoitettu virallisessa dokumentaatiossa - Lähetä joitain viestejä. No, siirrymme tuottajan kirjoittamiseen Pythonissa KafkaProducer API:n avulla.

Tuottajan kirjoittaminen

Tuottaja tuottaa satunnaista dataa - 100 viestiä sekunnissa. Satunnaisilla tiedoilla tarkoitamme sanakirjaa, joka koostuu kolmesta kentästä:

  • Sivuliike — luottolaitoksen myyntipisteen nimi;
  • valuutta — tapahtuman valuutta;
  • Määrä - Siirtosumma. Summa on positiivinen luku, jos kyseessä on pankin valuutan osto, ja negatiivinen luku, jos kyseessä on myynti.

Valmistajan koodi näyttää tältä:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Seuraavaksi lähetämme lähetysmenetelmällä viestin palvelimelle, tarvitsemamme aiheeseen, JSON-muodossa:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Kun suoritamme komentosarjaa, saamme seuraavat viestit terminaaliin:

Apache Kafka ja Streaming Data Processing with Spark Streaming

Tämä tarkoittaa, että kaikki toimii niin kuin halusimme – tuottaja luo ja lähettää viestejä tarvitsemamme aiheeseen.
Seuraava vaihe on asentaa Spark ja käsitellä tämä viestivirta.

Apache Sparkin asennus

Apache Spark on yleinen ja tehokas klusterilaskenta-alusta.

Spark toimii paremmin kuin MapReduce-mallin suositut toteutukset ja tukee samalla laajempaa valikoimaa laskentatyyppejä, mukaan lukien interaktiiviset kyselyt ja stream-käsittely. Nopeudella on tärkeä rooli suuria tietomääriä käsiteltäessä, koska se on nopeus, jonka avulla voit työskennellä interaktiivisesti ilman minuutteja tai tunteja odottamiseen. Yksi Sparkin suurimmista vahvuuksista, jotka tekevät siitä niin nopean, on sen kyky suorittaa muistilaskelmia.

Tämä kehys on kirjoitettu Scalassa, joten sinun on asennettava se ensin:

sudo apt-get install scala

Lataa Spark-jakelu viralliselta verkkosivustolta:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Pura arkisto:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Lisää Spark-polku bash-tiedostoon:

vim ~/.bashrc

Lisää seuraavat rivit editorin kautta:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Suorita alla oleva komento, kun olet tehnyt muutoksia bashrc:iin:

source ~/.bashrc

AWS PostgreSQL:n käyttöönotto

Jäljelle jää vain ottaa käyttöön tietokanta, johon lataamme virroista käsitellyt tiedot. Käytämme tähän AWS RDS -palvelua.

Siirry AWS-konsoliin -> AWS RDS -> Tietokannat -> Luo tietokanta:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Valitse PostgreSQL ja napsauta Seuraava:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Koska Tämä esimerkki on tarkoitettu vain opetustarkoituksiin; käytämme ilmaista palvelinta "vähintään" (Free Tier):
Apache Kafka ja Streaming Data Processing with Spark Streaming

Seuraavaksi laitamme rastin Free Tier -lohkoon, ja sen jälkeen meille tarjotaan automaattisesti t2.micro-luokan esiintymä - vaikka se on heikko, se on ilmainen ja sopiva tehtäväämme:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Seuraavaksi tulevat erittäin tärkeät asiat: tietokannan ilmentymän nimi, pääkäyttäjän nimi ja hänen salasanansa. Nimetään ilmentymä: myHabrTest, pääkäyttäjä: habr, Salasana: habr12345 ja napsauta Seuraava-painiketta:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Seuraavalla sivulla on parametrit, jotka vastaavat tietokantapalvelimemme käytettävyydestä ulkopuolelta (Julkinen saavutettavuus) ja porttien saatavuudesta:

Apache Kafka ja Streaming Data Processing with Spark Streaming

Luodaan VPC-suojausryhmälle uusi asetus, joka mahdollistaa ulkoisen pääsyn tietokantapalvelimellemme portin 5432 (PostgreSQL) kautta.
Siirrytään AWS-konsoliin erillisessä selainikkunassa VPC Dashboard -> Suojausryhmät -> Luo suojausryhmä -osioon:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Asetamme suojausryhmän nimen - PostgreSQL, kuvauksen, ilmoitamme mihin VPC:hen tämä ryhmä tulee liittää ja napsauta Luo-painiketta:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Täytä portin 5432 saapuvat säännöt äskettäin luodulle ryhmälle alla olevan kuvan mukaisesti. Et voi määrittää porttia manuaalisesti, vaan valitse avattavasta Type-luettelosta PostgreSQL.

Tarkkaan ottaen arvo ::/0 tarkoittaa saapuvan liikenteen saatavuutta palvelimelle kaikkialta maailmasta, mikä ei kanonisesti ole täysin totta, mutta esimerkin analysoimiseksi sallitaanpa käyttää tätä lähestymistapaa:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Palaamme selainsivulle, jossa on "Määritä lisäasetukset" auki ja valitse VPC-suojausryhmät -> Valitse olemassa olevat VPC-suojausryhmät -> PostgreSQL:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Seuraavaksi aseta tietokannan asetukset -> Tietokannan nimi -> nimi - habrDB.

Voimme jättää muut parametrit, lukuun ottamatta varmuuskopioinnin (varmuuskopion säilytysaika - 0 päivää), seurantaa ja Performance Insights -toimintoja, oletusarvoisesti. Napsauta painiketta Luo tietokanta:
Apache Kafka ja Streaming Data Processing with Spark Streaming

Langan käsittelijä

Viimeisessä vaiheessa kehitetään Spark-työ, joka käsittelee kahden sekunnin välein Kafkasta tulevaa uutta tietoa ja syöttää tuloksen tietokantaan.

Kuten yllä todettiin, tarkistuspisteet ovat SparkStreamingin ydinmekanismi, joka on määritettävä vikasietoisuuden varmistamiseksi. Käytämme tarkistuspisteitä, ja jos toimenpide epäonnistuu, Spark Streaming -moduulin tarvitsee vain palata viimeiseen tarkistuspisteeseen ja jatkaa laskelmia siitä palauttaakseen kadonneet tiedot.

Tarkistuspisteen voi ottaa käyttöön asettamalla vikasietoiseen, luotettavaan tiedostojärjestelmään (kuten HDFS, S3 jne.) hakemiston, johon tarkistuspisteen tiedot tallennetaan. Tämä tehdään käyttämällä esim.

streamingContext.checkpoint(checkpointDirectory)

Esimerkissämme käytämme seuraavaa lähestymistapaa, nimittäin jos checkpointDirectory on olemassa, konteksti luodaan uudelleen tarkistuspistetiedoista. Jos hakemistoa ei ole olemassa (eli suoritetaan ensimmäistä kertaa), functionToCreateContext kutsutaan luomaan uusi konteksti ja määrittämään DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Luomme DirectStream-objektin muodostamaan yhteyden "tapahtuma"-aiheeseen KafkaUtils-kirjaston createDirectStream-menetelmällä:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Saapuvien tietojen jäsentäminen JSON-muodossa:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL:n avulla teemme yksinkertaisen ryhmittelyn ja näytämme tuloksen konsolissa:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Kyselytekstin hakeminen ja sen suorittaminen Spark SQL:n kautta:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ja sitten tallennamme tuloksena saadut aggregoidut tiedot taulukkoon AWS RDS:ssä. Tallentaaksemme koontitulokset tietokantataulukkoon käytämme DataFrame-objektin kirjoitusmenetelmää:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Muutama sana yhteyden muodostamisesta AWS RDS:ään. Loimme sille käyttäjän ja salasanan "AWS PostgreSQL:n käyttöönotto" -vaiheessa. Sinun tulisi käyttää Endpointia tietokantapalvelimen URL-osoitteena, joka näkyy Yhteydet ja suojaus -osiossa:

Apache Kafka ja Streaming Data Processing with Spark Streaming

Jotta Spark ja Kafka voidaan yhdistää oikein, sinun tulee suorittaa työ smark-submitilla käyttämällä artefaktia spark-streaming-kafka-0-8_2.11. Lisäksi käytämme myös artefaktia vuorovaikutuksessa PostgreSQL-tietokannan kanssa, siirrämme ne ---pakettien kautta.

Skriptin joustavuuden vuoksi sisällytämme syöttöparametreiksi myös viestipalvelimen nimen ja aiheen, josta tietoja halutaan vastaanottaa.

Joten on aika käynnistää ja tarkistaa järjestelmän toimivuus:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Kaikki sujui! Kuten alla olevasta kuvasta näkyy, sovelluksen ollessa käynnissä uudet koontitulokset tulostetaan 2 sekunnin välein, koska asetimme eräväliksi 2 sekuntia, kun loimme StreamingContext-objektin:

Apache Kafka ja Streaming Data Processing with Spark Streaming

Seuraavaksi teemme yksinkertaisen kyselyn tietokantaan tarkistaaksemme tietueiden läsnäolon taulukossa tapahtuman_kulku:

Apache Kafka ja Streaming Data Processing with Spark Streaming

Johtopäätös

Tässä artikkelissa tarkasteltiin esimerkkiä tietojen suorakäsittelystä käyttämällä Spark Streamingia yhdessä Apache Kafkan ja PostgreSQL:n kanssa. Eri lähteistä peräisin olevan datan lisääntyessä on vaikea yliarvioida Spark Streamingin käytännön arvoa suoratoisto- ja reaaliaikasovellusten luomisessa.

Löydät koko lähdekoodin arkistostani osoitteessa GitHub.

Keskustelen mielelläni tästä artikkelista, odotan kommenttejasi ja toivon myös rakentavaa kritiikkiä kaikilta välittäviltä lukijoilta.

Toivotan teille menestystä!

Ps. Aluksi oli tarkoitus käyttää paikallista PostgreSQL-tietokantaa, mutta koska rakkauteni AWS:ään, päätin siirtää tietokannan pilveen. Seuraavassa tätä aihetta käsittelevässä artikkelissa näytän, kuinka koko yllä kuvattu järjestelmä toteutetaan AWS:ssä AWS Kinesis- ja AWS EMR:n avulla. Seuraa uutisia!

Lähde: will.com

Lisää kommentti