ProHoster > Blog > Adminisztráció > Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével
Apache Kafka és streaming adatfeldolgozás a Spark Streaming segítségével
Szia Habr! Ma olyan rendszert építünk, amely feldolgozza az Apache Kafka üzenetfolyamokat a Spark Streaming segítségével, és a feldolgozás eredményeit az AWS RDS felhőadatbázisba írja.
Képzeljük el, hogy egy bizonyos hitelintézet azt a feladatot tűzi ki elénk, hogy minden fiókjában „menet közben” dolgozzuk fel a beérkező tranzakciókat. Ez megtehető a kincstári nyitott devizapozíció azonnali kiszámítása, a tranzakciók limitjei vagy pénzügyi eredményei stb.
Hogyan valósítsuk meg ezt az esetet varázslatok és varázsigék használata nélkül – olvassa el a vágás alatt! Megy!
Természetesen a nagy mennyiségű adat valós idejű feldolgozása bőséges lehetőséget kínál a modern rendszerekben való felhasználásra. Ennek egyik legnépszerűbb kombinációja az Apache Kafka és a Spark Streaming tandem, ahol a Kafka folyamot hoz létre a bejövő üzenetcsomagokból, a Spark Streaming pedig adott időintervallumban dolgozza fel ezeket a csomagokat.
Az alkalmazás hibatűrésének növelése érdekében ellenőrzőpontokat használunk. Ezzel a mechanizmussal, amikor a Spark Streaming motornak vissza kell állítania az elveszett adatokat, csak vissza kell térnie az utolsó ellenőrzőponthoz, és onnan kell folytatnia a számításokat.
A kidolgozott rendszer felépítése
Felhasznált komponensek:
Apache Kafka egy elosztott közzététel-előfizetés üzenetküldő rendszer. Offline és online üzenetek fogyasztására egyaránt alkalmas. Az adatvesztés elkerülése érdekében a Kafka-üzeneteket a rendszer a lemezen tárolja, és a fürtön belül replikálja. A Kafka rendszer a ZooKeeper szinkronizálási szolgáltatásra épül;
Apache Spark Streaming - Spark komponens streaming adatok feldolgozásához. A Spark Streaming modul mikro kötegelt architektúrával épül fel, ahol az adatfolyamot kis adatcsomagok folyamatos sorozataként értelmezik. A Spark Streaming különböző forrásokból veszi az adatokat, és kis csomagokba egyesíti. Rendszeres időközönként új csomagok jönnek létre. Minden időintervallum elején egy új csomag jön létre, és az adott intervallum alatt kapott adatok belekerülnek a csomagba. Az intervallum végén a csomagok növekedése leáll. Az intervallum méretét a kötegelt intervallumnak nevezett paraméter határozza meg;
Apache Spark SQL - egyesíti a relációs feldolgozást a Spark funkcionális programozásával. A strukturált adatok olyan adatokat jelentenek, amelyeknek sémája van, azaz egyetlen mezőkészlet minden rekordhoz. A Spark SQL számos strukturált adatforrásból támogatja a bevitelt, és a sémainformációk elérhetőségének köszönhetően csak a szükséges rekordmezőket tudja hatékonyan lekérni, valamint DataFrame API-kat is biztosít;
AWS RDS egy viszonylag olcsó felhő alapú relációs adatbázis, webszolgáltatás, amely leegyszerűsíti a beállítást, a működést és a méretezést, és közvetlenül az Amazon adminisztrálja.
A Kafka szerver telepítése és futtatása
A Kafka közvetlen használata előtt meg kell győződnie arról, hogy rendelkezik Java-val, mert... A JVM-et a következő munkákhoz használják:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
A következő lépés nem kötelező. Az a tény, hogy az alapértelmezett beállítások nem teszik lehetővé az Apache Kafka összes funkciójának teljes körű használatát. Például töröljön egy témát, kategóriát, csoportot, amelyhez üzeneteket lehet közzétenni. Ennek megváltoztatásához szerkesszük a konfigurációs fájlt:
vim ~/kafka/config/server.properties
Adja hozzá a következőt a fájl végéhez:
delete.topic.enable = true
A Kafka szerver elindítása előtt el kell indítania a ZooKeeper szervert; a Kafka disztribúcióhoz mellékelt segédszkriptet fogjuk használni:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Miután a ZooKeeper sikeresen elindult, indítsa el a Kafka szervert egy külön terminálon:
Hagyjuk ki a termelő és fogyasztó tesztelésének pillanatait az újonnan létrehozott témához. Az üzenetek küldésének és fogadásának tesztelésével kapcsolatos további részletek a hivatalos dokumentációban találhatók - Küldj néhány üzenetet. Nos, áttérünk arra, hogy a KafkaProducer API segítségével Pythonban készítsünk producert.
Producer írása
A gyártó véletlenszerű adatokat generál – másodpercenként 100 üzenetet. Véletlenszerű adatokon egy három mezőből álló szótárat értünk:
Ág — a hitelintézet értékesítési helyének neve;
Valuta — tranzakció pénzneme;
Összeg — tranzakció összege. Az összeg pozitív szám lesz, ha a Bank valutavásárlásáról van szó, és negatív szám, ha eladásról van szó.
Ezután a küldési módszerrel üzenetet küldünk a szervernek, a kívánt témához, JSON formátumban:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
A szkript futtatásakor a következő üzeneteket kapjuk a terminálban:
Ez azt jelenti, hogy minden úgy működik, ahogy akartuk – a producer generál és küld üzeneteket a számunkra szükséges témához.
A következő lépés a Spark telepítése és az üzenetfolyam feldolgozása.
Az Apache Spark telepítése
Apache Spark egy univerzális és nagy teljesítményű fürt számítástechnikai platform.
A Spark jobban teljesít, mint a MapReduce modell népszerű megvalósításai, miközben a számítási típusok szélesebb körét támogatja, beleértve az interaktív lekérdezéseket és az adatfolyam-feldolgozást. A sebesség fontos szerepet játszik nagy mennyiségű adat feldolgozásakor, mivel ez a sebesség teszi lehetővé az interaktív munkát anélkül, hogy perceket vagy órákat kellene várakoznia. A Spark egyik legnagyobb erőssége, ami ilyen gyorssá teszi, az a képessége, hogy memórián belüli számításokat végezhet.
Ez a keretrendszer Scalában van írva, ezért először telepítenie kell:
sudo apt-get install scala
Töltse le a Spark disztribúciót a hivatalos webhelyről:
A bashrc módosítása után futtassa az alábbi parancsot:
source ~/.bashrc
Az AWS PostgreSQL telepítése
Már csak az adatbázis telepítése van hátra, amelybe feltöltjük a streamekből feldolgozott információkat. Ehhez az AWS RDS szolgáltatást fogjuk használni.
Válassza ki a PostgreSQL-t, és kattintson a Tovább gombra:
Mert Ez a példa csak oktatási célokat szolgál; „legalább” ingyenes szervert fogunk használni (Free Tier):
Ezután bejelöljük a Free Tier blokkot, és ezután automatikusan felajánljuk a t2.micro osztály egy példányát - bár gyenge, de ingyenes és a mi feladatunkra teljesen alkalmas:
Ezután nagyon fontos dolgok következnek: az adatbázispéldány neve, a fő felhasználó neve és jelszava. Nevezzük el a példányt: myHabrTest, fő felhasználó: habr, Jelszó: habr12345 és kattintson a Tovább gombra:
A következő oldalon az adatbázisszerverünk kívülről való elérhetőségéért (Nyilvános hozzáférhetőség) és a port elérhetőségéért felelős paraméterek találhatók:
Hozzunk létre egy új beállítást a VPC biztonsági csoporthoz, amely külső hozzáférést tesz lehetővé adatbázis-szerverünkhöz az 5432-es porton (PostgreSQL) keresztül.
Menjünk az AWS konzolra egy külön böngészőablakban a VPC Dashboard -> Biztonsági csoportok -> Biztonsági csoport létrehozása szakaszba:
Beállítjuk a biztonsági csoport nevét - PostgreSQL, leírást, jelezzük, hogy melyik VPC-hez kell társítani ezt a csoportot, és kattintson a Létrehozás gombra:
Töltse ki az 5432-es port bejövő szabályait az újonnan létrehozott csoporthoz az alábbi képen látható módon. A portot nem adhatja meg manuálisan, hanem válassza ki a PostgreSQL-t a Típus legördülő listából.
Szigorúan véve a ::/0 érték a bejövő forgalom elérhetőségét jelenti a szerver felé a világ minden tájáról, ami kanonikusan nem teljesen igaz, de a példa elemzéséhez engedjük meg magunknak ezt a megközelítést:
Visszatérünk a böngésző oldalra, ahol megnyílik a „Speciális beállítások konfigurálása”, és a VPC biztonsági csoportok részben válassza ki a -> Select meglévő VPC biztonsági csoportokat -> PostgreSQL:
Ezután az Adatbázis beállításai -> Adatbázis neve -> állítsa be a nevet - habrDB.
A többi paramétert a biztonsági mentés letiltása (a biztonsági mentés megőrzési ideje - 0 nap), a monitorozás és a Performance Insights kivételével alapértelmezés szerint hagyhatjuk. Kattintson a gombra Adatbázis létrehozása:
Szálkezelő
Az utolsó szakasz egy Spark-feladat kidolgozása lesz, amely két másodpercenként feldolgozza a Kafkától érkező új adatokat, és az eredményt beviszi az adatbázisba.
Amint fentebb megjegyeztük, az ellenőrzőpontok a SparkStreaming egyik alapvető mechanizmusa, amelyet a hibatűrés biztosítása érdekében konfigurálni kell. Ellenőrzőpontokat fogunk használni, és ha az eljárás sikertelen, a Spark Streaming modulnak csak az utolsó ellenőrzőponthoz kell visszatérnie, és onnan kell folytatnia a számításokat az elveszett adatok helyreállításához.
Az ellenőrzőpontokat úgy lehet engedélyezni, hogy egy hibatűrő, megbízható fájlrendszeren (például HDFS, S3 stb.) beállítunk egy könyvtárat, amelyben az ellenőrzési pontok információi kerülnek tárolásra. Ez például a következők használatával történik:
streamingContext.checkpoint(checkpointDirectory)
Példánkban a következő megközelítést fogjuk használni, nevezetesen, ha a checkpointDirectory létezik, akkor a kontextus újra létrejön az ellenőrzőpont adataiból. Ha a könyvtár nem létezik (azaz első alkalommal fut le), akkor a functionToCreateContext meghívásra kerül egy új kontextus létrehozása és a DStreams konfigurálása:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Létrehozunk egy DirectStream objektumot, amely a KafkaUtils könyvtár createDirectStream metódusával kapcsolódik a „tranzakciós” témához:
A Spark SQL használatával egyszerű csoportosítást végzünk, és az eredményt megjelenítjük a konzolon:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
A lekérdezés szövegének lekérése és futtatása Spark SQL-en keresztül:
Ezután az eredményül kapott összesített adatokat egy táblázatba mentjük az AWS RDS-ben. Az összesítési eredmények adatbázistáblába mentéséhez a DataFrame objektum írási metódusát fogjuk használni:
Néhány szó az AWS RDS-hez való csatlakozás beállításáról. A felhasználót és a jelszót az „AWS PostgreSQL telepítése” lépésben hoztuk létre. Használja az Endpointot adatbázis-kiszolgáló URL-jeként, amely a Kapcsolatok és biztonság részben jelenik meg:
A Spark és a Kafka megfelelő összekapcsolásához futtassa a feladatot a smark-submit segítségével a műtermék használatával szikra-streaming-kafka-0-8_2.11. Ezenkívül egy műterméket is használunk a PostgreSQL adatbázissal való interakcióhoz; ezeket --csomagokon keresztül továbbítjuk.
A szkript rugalmassága érdekében bemeneti paraméterként megadjuk az üzenetszerver nevét és a témát is, ahonnan adatokat szeretnénk fogadni.
Tehát itt az ideje elindítani és ellenőrizni a rendszer működését:
Minden sikerült! Ahogy az alábbi képen is látható, az alkalmazás futása közben 2 másodpercenként jelennek meg az új összesítési eredmények, mivel a StreamingContext objektum létrehozásakor a kötegelési intervallumot 2 másodpercre állítottuk be:
Ezután egyszerű lekérdezést végzünk az adatbázisban, hogy ellenőrizzük a rekordok jelenlétét a táblában tranzakció_folyamat:
Következtetés
Ez a cikk egy példát mutatott be a Spark Streaming és az Apache Kafka és a PostgreSQL együttes használatával végzett adatfolyam-feldolgozásra. A különböző forrásokból származó adatok növekedésével nehéz túlbecsülni a Spark Streaming gyakorlati értékét az adatfolyam- és valós idejű alkalmazások létrehozásában.
A teljes forráskódot megtalálja a tárhelyemben a címen GitHub.
Szívesen megvitatom ezt a cikket, várom észrevételeiket, és építő kritikát is remélek minden figyelmes olvasótól.
Sok sikert kívánok!
Zsolt. Kezdetben egy helyi PostgreSQL adatbázist terveztek használni, de az AWS iránti szeretetem miatt úgy döntöttem, hogy áthelyezem az adatbázist a felhőbe. A témával foglalkozó következő cikkben bemutatom, hogyan lehet megvalósítani a fent leírt teljes rendszert AWS-ben AWS Kinesis és AWS EMR segítségével. Kövesd a híreket!