Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Pozdravljeni, Habr! Danes bomo zgradili sistem, ki bo obdeloval tokove sporočil Apache Kafka z uporabo Spark Streaming in zapisoval rezultate obdelave v bazo podatkov v oblaku AWS RDS.

Predstavljajmo si, da nam določena kreditna institucija zada nalogo obdelave dohodnih transakcij "sproti" v vseh njenih podružnicah. To je mogoče storiti za namene takojšnjega izračuna odprte valutne pozicije za zakladnico, limitov ali finančnih rezultatov za transakcije itd.

Kako izvesti ta primer brez uporabe magije in čarobnih urokov - preberite pod rezom! Pojdi!

Apache Kafka in pretočna obdelava podatkov s Spark Streaming
(Vir slike)

Predstavitev

Seveda pa obdelava velike količine podatkov v realnem času ponuja veliko možnosti za uporabo v sodobnih sistemih. Ena najbolj priljubljenih kombinacij za to je tandem Apache Kafka in Spark Streaming, kjer Kafka ustvari tok dohodnih paketov sporočil, Spark Streaming pa te pakete obdela v danem časovnem intervalu.

Za povečanje odpornosti na napake aplikacije bomo uporabili kontrolne točke. S tem mehanizmom, ko mora motor Spark Streaming obnoviti izgubljene podatke, se mora samo vrniti na zadnjo kontrolno točko in nadaljevati izračune od tam.

Arhitektura razvitega sistema

Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Uporabljene komponente:

  • Apache Kafka je porazdeljen sistem za pošiljanje sporočil objava-naročanje. Primerno za uporabo sporočil brez povezave in na spletu. Da bi preprečili izgubo podatkov, so sporočila Kafka shranjena na disku in podvojena znotraj gruče. Sistem Kafka je zgrajen na vrhu sinhronizacijske storitve ZooKeeper;
  • Apache Spark Streaming - Komponenta Spark za obdelavo pretočnih podatkov. Modul Spark Streaming je zgrajen z mikro-paketno arhitekturo, kjer se podatkovni tok interpretira kot neprekinjeno zaporedje majhnih podatkovnih paketov. Spark Streaming vzame podatke iz različnih virov in jih združi v majhne pakete. Novi paketi se ustvarjajo v rednih intervalih. Na začetku vsakega časovnega intervala se ustvari nov paket in vsi podatki, prejeti v tem intervalu, so vključeni v paket. Ob koncu intervala se rast paketov ustavi. Velikost intervala je določena s parametrom, imenovanim paketni interval;
  • Apache Spark SQL - združuje relacijsko obdelavo s funkcionalnim programiranjem Spark. Strukturirani podatki pomenijo podatke, ki imajo shemo, to je en niz polj za vse zapise. Spark SQL podpira vnos iz različnih virov strukturiranih podatkov in lahko zaradi razpoložljivosti informacij o shemah učinkovito pridobi samo zahtevana polja zapisov ter ponuja tudi API-je DataFrame;
  • AWS RDS je razmeroma poceni relacijska zbirka podatkov v oblaku, spletna storitev, ki poenostavlja nastavitev, delovanje in skaliranje in jo neposredno upravlja Amazon.

Namestitev in zagon strežnika Kafka

Pred neposredno uporabo Kafke se morate prepričati, da imate Javo, ker ... JVM se uporablja za delo:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ustvarimo novega uporabnika za delo s Kafko:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Nato prenesite distribucijo z uradne spletne strani Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Razpakirajte preneseni arhiv:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Naslednji korak ni obvezen. Dejstvo je, da privzete nastavitve ne omogočajo popolne uporabe vseh zmožnosti Apache Kafka. Na primer, izbrišite temo, kategorijo, skupino, v kateri je mogoče objaviti sporočila. Če želite to spremeniti, uredimo konfiguracijsko datoteko:

vim ~/kafka/config/server.properties

Na konec datoteke dodajte naslednje:

delete.topic.enable = true

Preden zaženete strežnik Kafka, morate zagnati strežnik ZooKeeper; uporabili bomo pomožni skript, ki je priložen distribuciji Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Ko se ZooKeeper uspešno zažene, zaženite strežnik Kafka v ločenem terminalu:

bin/kafka-server-start.sh config/server.properties

Ustvarimo novo temo z imenom Transakcija:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Prepričajmo se, da je bila ustvarjena tema z zahtevanim številom particij in replikacije:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Zamudimo trenutke preizkušanja proizvajalca in potrošnika za novo ustvarjeno temo. Več podrobnosti o tem, kako lahko preizkusite pošiljanje in prejemanje sporočil, je napisano v uradni dokumentaciji - Pošlji nekaj sporočil. No, preidimo na pisanje producenta v Pythonu z uporabo API-ja KafkaProducer.

Pisanje producenta

Proizvajalec bo ustvaril naključne podatke - 100 sporočil vsako sekundo. Z naključnimi podatki razumemo slovar, sestavljen iz treh polj:

  • Branch — ime prodajnega mesta kreditne institucije;
  • valuta — valuta transakcije;
  • znesek — znesek transakcije. Znesek bo pozitivno število, če gre za nakup valute s strani banke, in negativno število, če gre za prodajo.

Koda za proizvajalca izgleda takole:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Nato z metodo pošiljanja pošljemo sporočilo na strežnik, na temo, ki jo potrebujemo, v formatu JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Ko izvajamo skript, dobimo v terminalu naslednja sporočila:

Apache Kafka in pretočna obdelava podatkov s Spark Streaming

To pomeni, da vse deluje tako, kot smo želeli - proizvajalec generira in pošilja sporočila na temo, ki jo potrebujemo.
Naslednji korak je namestitev Spark in obdelava tega toka sporočil.

Namestitev Apache Spark

Apache Spark je univerzalna in visoko zmogljiva grozdna računalniška platforma.

Spark deluje bolje kot priljubljene izvedbe modela MapReduce, hkrati pa podpira širši nabor vrst izračuna, vključno z interaktivnimi poizvedbami in obdelavo toka. Hitrost igra pomembno vlogo pri obdelavi velikih količin podatkov, saj je hitrost tista, ki vam omogoča interaktivno delo, ne da bi porabili minute ali ure čakanja. Ena največjih prednosti Spark, zaradi katere je tako hiter, je njegova sposobnost izvajanja izračunov v pomnilniku.

To ogrodje je napisano v Scali, zato ga morate najprej namestiti:

sudo apt-get install scala

Prenesite distribucijo Spark z uradnega spletnega mesta:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Razpakirajte arhiv:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Dodajte pot do Spark v datoteko bash:

vim ~/.bashrc

Skozi urejevalnik dodajte naslednje vrstice:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Po spremembah bashrc zaženite spodnji ukaz:

source ~/.bashrc

Uvajanje AWS PostgreSQL

Ostaja le še razporeditev podatkovne baze, v katero bomo naložili obdelane informacije iz tokov. Za to bomo uporabili storitev AWS RDS.

Pojdite na konzolo AWS -> AWS RDS -> Baze podatkov -> Ustvari bazo podatkov:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Izberite PostgreSQL in kliknite Naprej:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Ker Ta primer je samo za izobraževalne namene; uporabili bomo "najmanj" brezplačen strežnik (Free Tier):
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Nato postavimo kljukico v blok Free Tier in po tem nam bo samodejno ponujen primerek razreda t2.micro - čeprav šibek, je brezplačen in zelo primeren za našo nalogo:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Sledijo zelo pomembne stvari: ime instance baze podatkov, ime glavnega uporabnika in njegovo geslo. Poimenujmo primerek: myHabrTest, glavni uporabnik: habr, geslo: habr12345 in kliknite na gumb Naprej:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Na naslednji strani so parametri, ki so odgovorni za dostopnost našega strežnika baze podatkov od zunaj (javna dostopnost) in razpoložljivost vrat:

Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Ustvarimo novo nastavitev za varnostno skupino VPC, ki bo omogočila zunanji dostop do našega strežnika baze podatkov prek vrat 5432 (PostgreSQL).
Pojdimo na konzolo AWS v ločenem oknu brskalnika na nadzorno ploščo VPC -> Varnostne skupine -> razdelek Ustvari varnostno skupino:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Nastavimo ime za varnostno skupino - PostgreSQL, opis, navedemo, s katerim VPC naj bo ta skupina povezana, in kliknemo gumb Ustvari:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Izpolnite vhodna pravila za vrata 5432 za novo ustvarjeno skupino, kot je prikazano na spodnji sliki. Vrata ne morete določiti ročno, ampak izberite PostgreSQL s spustnega seznama Vrsta.

Strogo gledano vrednost ::/0 pomeni razpoložljivost dohodnega prometa na strežnik z vsega sveta, kar kanonično ni povsem res, a za analizo primera si dovolimo uporabiti ta pristop:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Vrnemo se na stran brskalnika, kjer imamo odprto “Konfiguriraj napredne nastavitve” in v razdelku varnostne skupine VPC izberemo -> Izberi obstoječe varnostne skupine VPC -> PostgreSQL:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Nato v možnostih baze podatkov -> Ime baze podatkov -> nastavite ime - habrDB.

Preostale parametre, z izjemo onemogočanja varnostnega kopiranja (obdobje hrambe varnostne kopije - 0 dni), spremljanja in Performance Insights, lahko pustimo privzeto. Kliknite na gumb Ustvarite bazo podatkov:
Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Upravljavec niti

Zadnja faza bo razvoj opravila Spark, ki bo vsaki dve sekundi obdelalo nove podatke, ki prihajajo iz Kafke, in rezultat vneslo v bazo podatkov.

Kot je navedeno zgoraj, so kontrolne točke osrednji mehanizem v SparkStreamingu, ki ga je treba konfigurirati za zagotavljanje tolerance napak. Uporabili bomo kontrolne točke in, če postopek ne uspe, se bo moral modul Spark Streaming vrniti samo na zadnjo kontrolno točko in nadaljevati izračune z nje, da obnovi izgubljene podatke.

Kontrolne točke je mogoče omogočiti z nastavitvijo imenika v zanesljivem datotečnem sistemu, ki je odporen na napake (kot je HDFS, S3 itd.), v katerem bodo shranjene informacije o kontrolnih točkah. To se naredi na primer z:

streamingContext.checkpoint(checkpointDirectory)

V našem primeru bomo uporabili naslednji pristop, in sicer, če checkpointDirectory obstaja, bo kontekst znova ustvarjen iz podatkov o kontrolni točki. Če imenik ne obstaja (tj. prvič izveden), se pokliče functionToCreateContext, da ustvari nov kontekst in konfigurira DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Ustvarimo objekt DirectStream za povezavo s temo »transakcija« z uporabo metode createDirectStream knjižnice KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Razčlenjevanje dohodnih podatkov v formatu JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Z uporabo Spark SQL naredimo preprosto združevanje in prikažemo rezultat v konzoli:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Pridobivanje besedila poizvedbe in izvajanje prek Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Nato dobljene združene podatke shranimo v tabelo v AWS RDS. Za shranjevanje rezultatov združevanja v tabelo baze podatkov bomo uporabili metodo pisanja objekta DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Nekaj ​​besed o nastavitvi povezave z AWS RDS. Uporabnika in geslo zanj smo ustvarili v koraku »Uvajanje AWS PostgreSQL«. Končno točko bi morali uporabiti kot URL strežnika zbirke podatkov, ki je prikazan v razdelku Povezljivost in varnost:

Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Če želite pravilno povezati Spark in Kafko, morate zagnati opravilo prek smark-submit z uporabo artefakta spark-streaming-kafka-0-8_2.11. Dodatno bomo uporabili tudi artefakt za interakcijo z bazo PostgreSQL, prenašali jih bomo preko --packages.

Za prilagodljivost skripte bomo kot vhodne parametre vključili tudi ime sporočilnega strežnika in temo iz katere želimo prejemati podatke.

Torej, čas je, da zaženete in preverite delovanje sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Vse se je izšlo! Kot lahko vidite na spodnji sliki, se med izvajanjem aplikacije novi rezultati združevanja prikažejo vsaki 2 sekundi, ker smo interval paketnega zbiranja nastavili na 2 sekundi, ko smo ustvarili objekt StreamingContext:

Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Nato naredimo preprosto poizvedbo v bazi podatkov, da preverimo prisotnost zapisov v tabeli transakcijski_tok:

Apache Kafka in pretočna obdelava podatkov s Spark Streaming

Zaključek

Ta članek je obravnaval primer pretočne obdelave informacij z uporabo Spark Streaming v povezavi z Apache Kafka in PostgreSQL. Z rastjo podatkov iz različnih virov je težko preceniti praktično vrednost Spark Streaminga za ustvarjanje pretočnih in realnočasovnih aplikacij.

Celotno izvorno kodo lahko najdete v mojem skladišču na GitHub.

Z veseljem razpravljam o tem članku, veselim se vaših komentarjev in upam tudi na konstruktivno kritiko vseh skrbnih bralcev.

Želim vam uspeh!

Ps. Sprva je bilo načrtovano, da bom uporabljal lokalno bazo podatkov PostgreSQL, vendar sem se glede na mojo ljubezen do AWS odločil bazo podatkov preseliti v oblak. V naslednjem članku na to temo bom pokazal, kako implementirati celoten zgoraj opisan sistem v AWS z uporabo AWS Kinesis in AWS EMR. Spremljajte novice!

Vir: www.habr.com

Dodaj komentar