Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Szia Habr! Ma olyan rendszert építünk, amely feldolgozza az Apache Kafka üzenetfolyamokat a Spark Streaming segítségével, és a feldolgozás eredményeit az AWS RDS felhőadatbázisba írja.

Képzeljük el, hogy egy bizonyos hitelintézet azt a feladatot tűzi ki elénk, hogy minden fiókjában „menet közben” dolgozzuk fel a beérkező tranzakciókat. Ez megtehető a kincstári nyitott devizapozíció azonnali kiszámítása, a tranzakciók limitjei vagy pénzügyi eredményei stb.

Hogyan valósítsuk meg ezt az esetet varázslatok és varázsigék használata nélkül – olvassa el a vágás alatt! Megy!

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével
(Kép forrása)

Bevezetés

Természetesen a nagy mennyiségű adat valós idejű feldolgozása bőséges lehetőséget kínál a modern rendszerekben való felhasználásra. Ennek egyik legnépszerűbb kombinációja az Apache Kafka és a Spark Streaming tandem, ahol a Kafka folyamot hoz létre a bejövő üzenetcsomagokból, a Spark Streaming pedig adott időintervallumban dolgozza fel ezeket a csomagokat.

Az alkalmazás hibatűrésének növelése érdekében ellenőrzőpontokat használunk. Ezzel a mechanizmussal, amikor a Spark Streaming motornak vissza kell állítania az elveszett adatokat, csak vissza kell térnie az utolsó ellenőrzőponthoz, és onnan kell folytatnia a számításokat.

A kidolgozott rendszer felépítése

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Felhasznált komponensek:

  • Apache Kafka egy elosztott közzététel-előfizetés üzenetküldő rendszer. Offline és online üzenetek fogyasztására egyaránt alkalmas. Az adatvesztés elkerülése érdekében a Kafka-üzeneteket a rendszer a lemezen tárolja, és a fürtön belül replikálja. A Kafka rendszer a ZooKeeper szinkronizálási szolgáltatásra épül;
  • Apache Spark Streaming - Spark komponens streaming adatok feldolgozásához. A Spark Streaming modul mikro kötegelt architektúrával épül fel, ahol az adatfolyamot kis adatcsomagok folyamatos sorozataként értelmezik. A Spark Streaming különböző forrásokból veszi az adatokat, és kis csomagokba egyesíti. Rendszeres időközönként új csomagok jönnek létre. Minden időintervallum elején egy új csomag jön létre, és az adott intervallum alatt kapott adatok belekerülnek a csomagba. Az intervallum végén a csomagok növekedése leáll. Az intervallum méretét a kötegelt intervallumnak nevezett paraméter határozza meg;
  • Apache Spark SQL - egyesíti a relációs feldolgozást a Spark funkcionális programozásával. A strukturált adatok olyan adatokat jelentenek, amelyeknek sémája van, azaz egyetlen mezőkészlet minden rekordhoz. A Spark SQL számos strukturált adatforrásból támogatja a bevitelt, és a sémainformációk elérhetőségének köszönhetően csak a szükséges rekordmezőket tudja hatékonyan lekérni, valamint DataFrame API-kat is biztosít;
  • AWS RDS egy viszonylag olcsó felhő alapú relációs adatbázis, webszolgáltatás, amely leegyszerűsíti a beállítást, a működést és a méretezést, és közvetlenül az Amazon adminisztrálja.

A Kafka szerver telepítése és futtatása

A Kafka közvetlen használata előtt meg kell győződnie arról, hogy rendelkezik Java-val, mert... A JVM-et a következő munkákhoz használják:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Hozzon létre egy új felhasználót a Kafkával való együttműködéshez:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ezután töltse le a disztribúciót az Apache Kafka hivatalos webhelyéről:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Csomagolja ki a letöltött archívumot:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

A következő lépés nem kötelező. Az a tény, hogy az alapértelmezett beállítások nem teszik lehetővé az Apache Kafka összes funkciójának teljes körű használatát. Például töröljön egy témát, kategóriát, csoportot, amelyhez üzeneteket lehet közzétenni. Ennek megváltoztatásához szerkesszük a konfigurációs fájlt:

vim ~/kafka/config/server.properties

Adja hozzá a következőt a fájl végéhez:

delete.topic.enable = true

A Kafka szerver elindítása előtt el kell indítania a ZooKeeper szervert; a Kafka disztribúcióhoz mellékelt segédszkriptet fogjuk használni:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Miután a ZooKeeper sikeresen elindult, indítsa el a Kafka szervert egy külön terminálon:

bin/kafka-server-start.sh config/server.properties

Hozzunk létre egy új témát Tranzakció néven:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Győződjön meg arról, hogy létrejött egy témakör a szükséges számú partícióval és replikációval:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Hagyjuk ki a termelő és fogyasztó tesztelésének pillanatait az újonnan létrehozott témához. Az üzenetek küldésének és fogadásának tesztelésével kapcsolatos további részletek a hivatalos dokumentációban találhatók - Küldj néhány üzenetet. Nos, áttérünk arra, hogy a KafkaProducer API segítségével Pythonban készítsünk producert.

Producer írása

A gyártó véletlenszerű adatokat generál – másodpercenként 100 üzenetet. Véletlenszerű adatokon egy három mezőből álló szótárat értünk:

  • Ág — a hitelintézet értékesítési helyének neve;
  • Valuta — tranzakció pénzneme;
  • Összeg — tranzakció összege. Az összeg pozitív szám lesz, ha a Bank valutavásárlásáról van szó, és negatív szám, ha eladásról van szó.

A gyártó kódja így néz ki:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Ezután a küldési módszerrel üzenetet küldünk a szervernek, a kívánt témához, JSON formátumban:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

A szkript futtatásakor a következő üzeneteket kapjuk a terminálban:

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Ez azt jelenti, hogy minden úgy működik, ahogy akartuk – a producer generál és küld üzeneteket a számunkra szükséges témához.
A következő lépés a Spark telepítése és az üzenetfolyam feldolgozása.

Az Apache Spark telepítése

Apache Spark egy univerzális és nagy teljesítményű fürt számítástechnikai platform.

A Spark jobban teljesít, mint a MapReduce modell népszerű megvalósításai, miközben a számítási típusok szélesebb körét támogatja, beleértve az interaktív lekérdezéseket és az adatfolyam-feldolgozást. A sebesség fontos szerepet játszik nagy mennyiségű adat feldolgozásakor, mivel ez a sebesség teszi lehetővé az interaktív munkát anélkül, hogy perceket vagy órákat kellene várakoznia. A Spark egyik legnagyobb erőssége, ami ilyen gyorssá teszi, az a képessége, hogy memórián belüli számításokat végezhet.

Ez a keretrendszer Scalában van írva, ezért először telepítenie kell:

sudo apt-get install scala

Töltse le a Spark disztribúciót a hivatalos webhelyről:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Csomagolja ki az archívumot:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Adja hozzá a Spark elérési útját a bash fájlhoz:

vim ~/.bashrc

Adja hozzá a következő sorokat a szerkesztőn keresztül:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

A bashrc módosítása után futtassa az alábbi parancsot:

source ~/.bashrc

Az AWS PostgreSQL telepítése

Már csak az adatbázis telepítése van hátra, amelybe feltöltjük a streamekből feldolgozott információkat. Ehhez az AWS RDS szolgáltatást fogjuk használni.

Lépjen az AWS konzolra -> AWS RDS -> Adatbázisok -> Adatbázis létrehozása:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Válassza ki a PostgreSQL-t, és kattintson a Tovább gombra:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Mert Ez a példa csak oktatási célokat szolgál; „legalább” ingyenes szervert fogunk használni (Free Tier):
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Ezután bejelöljük a Free Tier blokkot, és ezután automatikusan felajánljuk a t2.micro osztály egy példányát - bár gyenge, de ingyenes és a mi feladatunkra teljesen alkalmas:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Ezután nagyon fontos dolgok következnek: az adatbázispéldány neve, a fő felhasználó neve és jelszava. Nevezzük el a példányt: myHabrTest, fő felhasználó: habr, Jelszó: habr12345 és kattintson a Tovább gombra:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

A következő oldalon az adatbázisszerverünk kívülről való elérhetőségéért (Nyilvános hozzáférhetőség) és a port elérhetőségéért felelős paraméterek találhatók:

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Hozzunk létre egy új beállítást a VPC biztonsági csoporthoz, amely külső hozzáférést tesz lehetővé adatbázis-szerverünkhöz az 5432-es porton (PostgreSQL) keresztül.
Menjünk az AWS konzolra egy külön böngészőablakban a VPC Dashboard -> Biztonsági csoportok -> Biztonsági csoport létrehozása szakaszba:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Beállítjuk a biztonsági csoport nevét - PostgreSQL, leírást, jelezzük, hogy melyik VPC-hez kell társítani ezt a csoportot, és kattintson a Létrehozás gombra:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Töltse ki az 5432-es port bejövő szabályait az újonnan létrehozott csoporthoz az alábbi képen látható módon. A portot nem adhatja meg manuálisan, hanem válassza ki a PostgreSQL-t a Típus legördülő listából.

Szigorúan véve a ::/0 érték a bejövő forgalom elérhetőségét jelenti a szerver felé a világ minden tájáról, ami kanonikusan nem teljesen igaz, de a példa elemzéséhez engedjük meg magunknak ezt a megközelítést:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Visszatérünk a böngésző oldalra, ahol megnyílik a „Speciális beállítások konfigurálása”, és a VPC biztonsági csoportok részben válassza ki a -> Select meglévő VPC biztonsági csoportokat -> PostgreSQL:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Ezután az Adatbázis beállításai -> Adatbázis neve -> állítsa be a nevet - habrDB.

A többi paramétert a biztonsági mentés letiltása (a biztonsági mentés megőrzési ideje - 0 nap), a monitorozás és a Performance Insights kivételével alapértelmezés szerint hagyhatjuk. Kattintson a gombra Adatbázis létrehozása:
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Szálkezelő

Az utolsó szakasz egy Spark-feladat kidolgozása lesz, amely két másodpercenként feldolgozza a Kafkától érkező új adatokat, és az eredményt beviszi az adatbázisba.

Amint fentebb megjegyeztük, az ellenőrzőpontok a SparkStreaming egyik alapvető mechanizmusa, amelyet a hibatűrés biztosítása érdekében konfigurálni kell. Ellenőrzőpontokat fogunk használni, és ha az eljárás sikertelen, a Spark Streaming modulnak csak az utolsó ellenőrzőponthoz kell visszatérnie, és onnan kell folytatnia a számításokat az elveszett adatok helyreállításához.

Az ellenőrzőpontokat úgy lehet engedélyezni, hogy egy hibatűrő, megbízható fájlrendszeren (például HDFS, S3 stb.) beállítunk egy könyvtárat, amelyben az ellenőrzési pontok információi kerülnek tárolásra. Ez például a következők használatával történik:

streamingContext.checkpoint(checkpointDirectory)

Példánkban a következő megközelítést fogjuk használni, nevezetesen, ha a checkpointDirectory létezik, akkor a kontextus újra létrejön az ellenőrzőpont adataiból. Ha a könyvtár nem létezik (azaz első alkalommal fut le), akkor a functionToCreateContext meghívásra kerül egy új kontextus létrehozása és a DStreams konfigurálása:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Létrehozunk egy DirectStream objektumot, amely a KafkaUtils könyvtár createDirectStream metódusával kapcsolódik a „tranzakciós” témához:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Bejövő adatok elemzése JSON formátumban:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

A Spark SQL használatával egyszerű csoportosítást végzünk, és az eredményt megjelenítjük a konzolon:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

A lekérdezés szövegének lekérése és futtatása Spark SQL-en keresztül:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ezután az eredményül kapott összesített adatokat egy táblázatba mentjük az AWS RDS-ben. Az összesítési eredmények adatbázistáblába mentéséhez a DataFrame objektum írási metódusát fogjuk használni:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Néhány szó az AWS RDS-hez való csatlakozás beállításáról. A felhasználót és a jelszót az „AWS PostgreSQL telepítése” lépésben hoztuk létre. Használja az Endpointot adatbázis-kiszolgáló URL-jeként, amely a Kapcsolatok és biztonság részben jelenik meg:

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

A Spark és a Kafka megfelelő összekapcsolásához futtassa a feladatot a smark-submit segítségével a műtermék használatával szikra-streaming-kafka-0-8_2.11. Ezenkívül egy műterméket is használunk a PostgreSQL adatbázissal való interakcióhoz; ezeket --csomagokon keresztül továbbítjuk.

A szkript rugalmassága érdekében bemeneti paraméterként megadjuk az üzenetszerver nevét és a témát is, ahonnan adatokat szeretnénk fogadni.

Tehát itt az ideje elindítani és ellenőrizni a rendszer működését:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Minden sikerült! Ahogy az alábbi képen is látható, az alkalmazás futása közben 2 másodpercenként jelennek meg az új összesítési eredmények, mivel a StreamingContext objektum létrehozásakor a kötegelési intervallumot 2 másodpercre állítottuk be:

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Ezután egyszerű lekérdezést végzünk az adatbázisban, hogy ellenőrizzük a rekordok jelenlétét a táblában tranzakció_folyamat:

Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével

Következtetés

Ez a cikk egy példát mutatott be a Spark Streaming és az Apache Kafka és a PostgreSQL együttes használatával végzett adatfolyam-feldolgozásra. A különböző forrásokból származó adatok növekedésével nehéz túlbecsülni a Spark Streaming gyakorlati értékét az adatfolyam- és valós idejű alkalmazások létrehozásában.

A teljes forráskódot megtalálja a tárhelyemben a címen GitHub.

Szívesen megvitatom ezt a cikket, várom észrevételeiket, és építő kritikát is remélek minden figyelmes olvasótól.

Sok sikert kívánok!

Zsolt. Kezdetben egy helyi PostgreSQL adatbázist terveztek használni, de az AWS iránti szeretetem miatt úgy döntöttem, hogy áthelyezem az adatbázist a felhőbe. A témával foglalkozó következő cikkben bemutatom, hogyan lehet megvalósítani a fent leírt teljes rendszert AWS-ben AWS Kinesis és AWS EMR segítségével. Kövesd a híreket!

Forrás: will.com

Hozzászólás