Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming
Ahoj Habr! Dnes postavíme systém, ktorý bude spracovávať streamy správ Apache Kafka pomocou Spark Streaming a zapisovať výsledky spracovania do cloudovej databázy AWS RDS.
Predstavme si, že určitá úverová inštitúcia nám dáva za úlohu spracovávať prichádzajúce transakcie „za chodu“ vo všetkých jej pobočkách. Dá sa to urobiť za účelom rýchleho výpočtu otvorenej menovej pozície pre štátnu pokladnicu, limitov alebo finančných výsledkov transakcií atď.
Ako implementovať tento prípad bez použitia mágie a magických kúziel - prečítajte si pod rezom! Choď!
Spracovanie veľkého množstva dát v reálnom čase samozrejme poskytuje široké možnosti využitia v moderných systémoch. Jednou z najpopulárnejších kombinácií je tandem Apache Kafka a Spark Streaming, kde Kafka vytvára prúd paketov prichádzajúcich správ a Spark Streaming tieto pakety spracováva v danom časovom intervale.
Na zvýšenie odolnosti aplikácie voči chybám použijeme kontrolné body. S týmto mechanizmom, keď motor Spark Streaming potrebuje obnoviť stratené dáta, stačí sa vrátiť k poslednému kontrolnému bodu a odtiaľ obnoviť výpočty.
Architektúra vyvinutého systému
Použité komponenty:
Apache Kafka je distribuovaný systém zasielania správ typu publikovať a odoberať. Vhodné pre offline aj online spotrebu správ. Aby sa predišlo strate údajov, správy Kafka sa ukladajú na disk a replikujú sa v rámci klastra. Systém Kafka je postavený nad synchronizačnou službou ZooKeeper;
Streamovanie Apache Spark - Komponent Spark na spracovanie streamovaných dát. Modul Spark Streaming je zostavený pomocou mikrodávkovej architektúry, kde je dátový tok interpretovaný ako súvislá sekvencia malých dátových paketov. Spark Streaming berie dáta z rôznych zdrojov a kombinuje ich do malých balíkov. Nové balíčky sa vytvárajú v pravidelných intervaloch. Na začiatku každého časového intervalu sa vytvorí nový paket a všetky dáta prijaté počas tohto intervalu sú zahrnuté do paketu. Na konci intervalu sa rast paketov zastaví. Veľkosť intervalu je určená parametrom nazývaným dávkový interval;
Apache Spark SQL - kombinuje relačné spracovanie s funkčným programovaním Spark. Štruktúrované údaje znamenajú údaje, ktoré majú schému, teda jednu množinu polí pre všetky záznamy. Spark SQL podporuje vstup z rôznych zdrojov štruktúrovaných údajov a vďaka dostupnosti informácií o schéme dokáže efektívne získať len požadované polia záznamov a tiež poskytuje rozhrania DataFrame API;
AWS RDS je relatívne lacná cloudová relačná databáza, webová služba, ktorá zjednodušuje nastavenie, prevádzku a škálovanie a spravuje ju priamo Amazon.
Inštalácia a spustenie servera Kafka
Pred priamym použitím Kafky sa musíte uistiť, že máte Javu, pretože... JVM sa používa na prácu:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Ďalší krok je voliteľný. Faktom je, že predvolené nastavenia vám neumožňujú plne využiť všetky možnosti Apache Kafka. Môžete napríklad odstrániť tému, kategóriu, skupinu, do ktorej sa môžu publikovať správy. Ak to chcete zmeniť, upravte konfiguračný súbor:
vim ~/kafka/config/server.properties
Na koniec súboru pridajte nasledovné:
delete.topic.enable = true
Pred spustením servera Kafka je potrebné spustiť server ZooKeeper, použijeme pomocný skript, ktorý je súčasťou distribúcie Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Po úspešnom spustení ZooKeepera spustite server Kafka v samostatnom termináli:
Premeškajme chvíle testovania výrobcu a spotrebiteľa pre novovzniknutú tému. Ďalšie podrobnosti o tom, ako môžete otestovať odosielanie a prijímanie správ, sú uvedené v oficiálnej dokumentácii - Pošlite nejaké správy. No, prejdeme k písaniu producenta v Pythone pomocou KafkaProducer API.
Producentské písanie
Výrobca vygeneruje náhodné dáta - 100 správ každú sekundu. Pod náhodnými údajmi rozumieme slovník pozostávajúci z troch polí:
Vetva — názov predajného miesta úverovej inštitúcie;
mena — mena transakcie;
čiastka - hodnota transakcie. Suma bude kladné číslo, ak ide o nákup meny bankou, a záporné číslo, ak ide o predaj.
Ďalej pomocou metódy send odošleme správu na server na tému, ktorú potrebujeme, vo formáte JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Pri spustení skriptu dostávame v termináli nasledujúce správy:
To znamená, že všetko funguje tak, ako sme chceli – producent generuje a posiela správy k téme, ktorú potrebujeme.
Ďalším krokom je inštalácia Spark a spracovanie tohto streamu správ.
Inštalácia Apache Spark
Apache Spark je univerzálna a vysokovýkonná klastrová výpočtová platforma.
Spark funguje lepšie ako populárne implementácie modelu MapReduce a zároveň podporuje širšiu škálu typov výpočtov, vrátane interaktívnych dotazov a spracovania streamov. Rýchlosť hrá dôležitú úlohu pri spracovaní veľkého množstva dát, pretože práve rýchlosť vám umožňuje pracovať interaktívne bez trávenia minút alebo hodín čakaním. Jednou z najväčších predností Sparku, vďaka ktorej je taký rýchly, je jeho schopnosť vykonávať výpočty v pamäti.
Tento rámec je napísaný v Scale, takže ho musíte najskôr nainštalovať:
sudo apt-get install scala
Stiahnite si distribúciu Spark z oficiálnej webovej stránky:
Po vykonaní zmien v bashrc spustite príkaz uvedený nižšie:
source ~/.bashrc
Nasadenie AWS PostgreSQL
Ostáva už len nasadiť databázu, do ktorej nahráme spracované informácie zo streamov. Na tento účel použijeme službu AWS RDS.
Prejdite do konzoly AWS -> AWS RDS -> Databázy -> Vytvoriť databázu:
Vyberte PostgreSQL a kliknite na Ďalej:
Pretože Tento príklad slúži len na vzdelávacie účely; použijeme bezplatný server „minimálne“ (bezplatná úroveň):
Ďalej zaškrtneme blok Free Tier a potom sa nám automaticky ponúkne inštancia triedy t2.micro - hoci je slabá, je bezplatná a celkom vhodná pre našu úlohu:
Ďalej prichádzajú veľmi dôležité veci: názov inštancie databázy, meno hlavného užívateľa a jeho heslo. Pomenujme inštanciu: myHabrTest, hlavný používateľ: habr, heslo: habr12345 a kliknite na tlačidlo Ďalej:
Na ďalšej stránke sú parametre zodpovedné za prístupnosť nášho databázového servera zvonku (Public accessibility) a dostupnosť portov:
Vytvorme nové nastavenie pre bezpečnostnú skupinu VPC, ktorá umožní externý prístup k nášmu databázovému serveru cez port 5432 (PostgreSQL).
Poďme na konzolu AWS v samostatnom okne prehliadača do sekcie VPC Dashboard -> Bezpečnostné skupiny -> Vytvoriť bezpečnostnú skupinu:
Nastavili sme názov pre bezpečnostnú skupinu - PostgreSQL, popis, uveďte, ku ktorému VPC má byť táto skupina priradená a kliknite na tlačidlo Vytvoriť:
Vyplňte pravidlá vstupu pre port 5432 pre novovytvorenú skupinu, ako je znázornené na obrázku nižšie. Port nemôžete zadať ručne, ale vyberte PostgreSQL z rozbaľovacieho zoznamu Typ.
Presne povedané, hodnota ::/0 znamená dostupnosť prichádzajúcej prevádzky na server z celého sveta, čo kanonicky nie je úplne pravda, ale na analýzu príkladu si dovoľme použiť tento prístup:
Vrátime sa na stránku prehliadača, kde máme otvorené “Konfigurovať rozšírené nastavenia” a v sekcii VPC bezpečnostné skupiny vyberte -> Vybrať existujúce bezpečnostné skupiny VPC -> PostgreSQL:
Ďalej v možnostiach databázy -> Názov databázy -> nastavte názov - habrDB.
Zvyšné parametre, s výnimkou zakázania zálohovania (doba uchovávania zálohy – 0 dní), monitorovania a Performance Insights, môžeme ponechať štandardne. Kliknite na tlačidlo Vytvorte databázu:
Manipulátor závitov
Poslednou fázou bude vývoj úlohy Spark, ktorá každé dve sekundy spracuje nové dáta prichádzajúce od Kafku a výsledok vloží do databázy.
Ako je uvedené vyššie, kontrolné body sú základným mechanizmom v SparkStreaming, ktorý musí byť nakonfigurovaný na zabezpečenie odolnosti voči chybám. Použijeme kontrolné body a ak postup zlyhá, modul Spark Streaming sa bude musieť vrátiť iba k poslednému kontrolnému bodu a obnoviť z neho výpočty na obnovenie stratených údajov.
Kontrolný bod je možné povoliť nastavením adresára na spoľahlivom súborovom systéme odolnom voči chybám (napríklad HDFS, S3 atď.), v ktorom budú uložené informácie o kontrolnom bode. Robí sa to napríklad pomocou:
streamingContext.checkpoint(checkpointDirectory)
V našom príklade použijeme nasledujúci prístup, menovite, ak existuje checkpointDirectory, kontext sa znovu vytvorí z údajov kontrolného bodu. Ak adresár neexistuje (t. j. je spustený prvýkrát), potom sa zavolá funkcia functionToCreateContext na vytvorenie nového kontextu a konfiguráciu DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Vytvoríme objekt DirectStream na pripojenie k téme „transakcia“ pomocou metódy createDirectStream knižnice KafkaUtils:
Pomocou Spark SQL urobíme jednoduché zoskupenie a výsledok zobrazíme v konzole:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Získanie textu dotazu a jeho spustenie cez Spark SQL:
A potom výsledné agregované údaje uložíme do tabuľky v AWS RDS. Na uloženie výsledkov agregácie do databázovej tabuľky použijeme metódu zápisu objektu DataFrame:
Niekoľko slov o nastavení pripojenia k AWS RDS. Používateľa a heslo sme preň vytvorili v kroku „Nasadenie AWS PostgreSQL“. Koncový bod by ste mali použiť ako adresu URL databázového servera, ktorá sa zobrazuje v časti Pripojenie a zabezpečenie:
Aby ste správne prepojili Spark a Kafku, mali by ste spustiť úlohu cez smark-submit pomocou artefaktu spark-streaming-kafka-0-8_2.11. Okrem toho použijeme aj artefakt na interakciu s databázou PostgreSQL, prenesieme ich cez --packages.
Pre flexibilitu skriptu zahrnieme ako vstupné parametre aj názov servera správ a tému, z ktorej chceme prijímať dáta.
Je teda čas spustiť a skontrolovať funkčnosť systému:
Všetko vyšlo! Ako môžete vidieť na obrázku nižšie, keď je aplikácia spustená, nové výsledky agregácie sa generujú každé 2 sekundy, pretože pri vytváraní objektu StreamingContext sme nastavili interval dávkovania na 2 sekundy:
Ďalej urobíme jednoduchý dotaz do databázy na kontrolu prítomnosti záznamov v tabuľke tok_transakcií:
Záver
Tento článok sa zameral na príklad streamového spracovania informácií pomocou Spark Streaming v spojení s Apache Kafka a PostgreSQL. S nárastom údajov z rôznych zdrojov je ťažké preceňovať praktickú hodnotu Spark Streaming pre vytváranie streamingu a aplikácií v reálnom čase.
Úplný zdrojový kód nájdete v mojom úložisku na adrese GitHub.
Rád diskutujem o tomto článku, teším sa na vaše komentáre a dúfam aj v konštruktívnu kritiku od všetkých starostlivých čitateľov.
Prajem vám úspech!
Ps. Pôvodne bolo plánované použitie lokálnej databázy PostgreSQL, ale vzhľadom na moju lásku k AWS som sa rozhodol presunúť databázu do cloudu. V ďalšom článku na túto tému ukážem, ako implementovať celý vyššie popísaný systém v AWS pomocou AWS Kinesis a AWS EMR. Sledujte novinky!