Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Ahoj Habr! Dnes postavíme systém, ktorý bude spracovávať streamy správ Apache Kafka pomocou Spark Streaming a zapisovať výsledky spracovania do cloudovej databázy AWS RDS.

Predstavme si, že určitá úverová inštitúcia nám dáva za úlohu spracovávať prichádzajúce transakcie „za chodu“ vo všetkých jej pobočkách. Dá sa to urobiť za účelom rýchleho výpočtu otvorenej menovej pozície pre štátnu pokladnicu, limitov alebo finančných výsledkov transakcií atď.

Ako implementovať tento prípad bez použitia mágie a magických kúziel - prečítajte si pod rezom! Choď!

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming
(zdroj obrázka)

Úvod

Spracovanie veľkého množstva dát v reálnom čase samozrejme poskytuje široké možnosti využitia v moderných systémoch. Jednou z najpopulárnejších kombinácií je tandem Apache Kafka a Spark Streaming, kde Kafka vytvára prúd paketov prichádzajúcich správ a Spark Streaming tieto pakety spracováva v danom časovom intervale.

Na zvýšenie odolnosti aplikácie voči chybám použijeme kontrolné body. S týmto mechanizmom, keď motor Spark Streaming potrebuje obnoviť stratené dáta, stačí sa vrátiť k poslednému kontrolnému bodu a odtiaľ obnoviť výpočty.

Architektúra vyvinutého systému

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Použité komponenty:

  • Apache Kafka je distribuovaný systém zasielania správ typu publikovať a odoberať. Vhodné pre offline aj online spotrebu správ. Aby sa predišlo strate údajov, správy Kafka sa ukladajú na disk a replikujú sa v rámci klastra. Systém Kafka je postavený nad synchronizačnou službou ZooKeeper;
  • Streamovanie Apache Spark - Komponent Spark na spracovanie streamovaných dát. Modul Spark Streaming je zostavený pomocou mikrodávkovej architektúry, kde je dátový tok interpretovaný ako súvislá sekvencia malých dátových paketov. Spark Streaming berie dáta z rôznych zdrojov a kombinuje ich do malých balíkov. Nové balíčky sa vytvárajú v pravidelných intervaloch. Na začiatku každého časového intervalu sa vytvorí nový paket a všetky dáta prijaté počas tohto intervalu sú zahrnuté do paketu. Na konci intervalu sa rast paketov zastaví. Veľkosť intervalu je určená parametrom nazývaným dávkový interval;
  • Apache Spark SQL - kombinuje relačné spracovanie s funkčným programovaním Spark. Štruktúrované údaje znamenajú údaje, ktoré majú schému, teda jednu množinu polí pre všetky záznamy. Spark SQL podporuje vstup z rôznych zdrojov štruktúrovaných údajov a vďaka dostupnosti informácií o schéme dokáže efektívne získať len požadované polia záznamov a tiež poskytuje rozhrania DataFrame API;
  • AWS RDS je relatívne lacná cloudová relačná databáza, webová služba, ktorá zjednodušuje nastavenie, prevádzku a škálovanie a spravuje ju priamo Amazon.

Inštalácia a spustenie servera Kafka

Pred priamym použitím Kafky sa musíte uistiť, že máte Javu, pretože... JVM sa používa na prácu:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Vytvorme nového používateľa na prácu s Kafkom:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ďalej si stiahnite distribúciu z oficiálnej webovej stránky Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Rozbaľte stiahnutý archív:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Ďalší krok je voliteľný. Faktom je, že predvolené nastavenia vám neumožňujú plne využiť všetky možnosti Apache Kafka. Môžete napríklad odstrániť tému, kategóriu, skupinu, do ktorej sa môžu publikovať správy. Ak to chcete zmeniť, upravte konfiguračný súbor:

vim ~/kafka/config/server.properties

Na koniec súboru pridajte nasledovné:

delete.topic.enable = true

Pred spustením servera Kafka je potrebné spustiť server ZooKeeper, použijeme pomocný skript, ktorý je súčasťou distribúcie Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Po úspešnom spustení ZooKeepera spustite server Kafka v samostatnom termináli:

bin/kafka-server-start.sh config/server.properties

Vytvorme novú tému s názvom Transakcia:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Uistite sa, že bola vytvorená téma s požadovaným počtom oddielov a replikácie:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Premeškajme chvíle testovania výrobcu a spotrebiteľa pre novovzniknutú tému. Ďalšie podrobnosti o tom, ako môžete otestovať odosielanie a prijímanie správ, sú uvedené v oficiálnej dokumentácii - Pošlite nejaké správy. No, prejdeme k písaniu producenta v Pythone pomocou KafkaProducer API.

Producentské písanie

Výrobca vygeneruje náhodné dáta - 100 správ každú sekundu. Pod náhodnými údajmi rozumieme slovník pozostávajúci z troch polí:

  • Vetva — názov predajného miesta úverovej inštitúcie;
  • mena — mena transakcie;
  • čiastka - hodnota transakcie. Suma bude kladné číslo, ak ide o nákup meny bankou, a záporné číslo, ak ide o predaj.

Kód výrobcu vyzerá takto:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Ďalej pomocou metódy send odošleme správu na server na tému, ktorú potrebujeme, vo formáte JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Pri spustení skriptu dostávame v termináli nasledujúce správy:

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

To znamená, že všetko funguje tak, ako sme chceli – producent generuje a posiela správy k téme, ktorú potrebujeme.
Ďalším krokom je inštalácia Spark a spracovanie tohto streamu správ.

Inštalácia Apache Spark

Apache Spark je univerzálna a vysokovýkonná klastrová výpočtová platforma.

Spark funguje lepšie ako populárne implementácie modelu MapReduce a zároveň podporuje širšiu škálu typov výpočtov, vrátane interaktívnych dotazov a spracovania streamov. Rýchlosť hrá dôležitú úlohu pri spracovaní veľkého množstva dát, pretože práve rýchlosť vám umožňuje pracovať interaktívne bez trávenia minút alebo hodín čakaním. Jednou z najväčších predností Sparku, vďaka ktorej je taký rýchly, je jeho schopnosť vykonávať výpočty v pamäti.

Tento rámec je napísaný v Scale, takže ho musíte najskôr nainštalovať:

sudo apt-get install scala

Stiahnite si distribúciu Spark z oficiálnej webovej stránky:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Rozbaľte archív:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Pridajte cestu k Sparku do súboru bash:

vim ~/.bashrc

Pomocou editora pridajte nasledujúce riadky:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Po vykonaní zmien v bashrc spustite príkaz uvedený nižšie:

source ~/.bashrc

Nasadenie AWS PostgreSQL

Ostáva už len nasadiť databázu, do ktorej nahráme spracované informácie zo streamov. Na tento účel použijeme službu AWS RDS.

Prejdite do konzoly AWS -> AWS RDS -> Databázy -> Vytvoriť databázu:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Vyberte PostgreSQL a kliknite na Ďalej:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Pretože Tento príklad slúži len na vzdelávacie účely; použijeme bezplatný server „minimálne“ (bezplatná úroveň):
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Ďalej zaškrtneme blok Free Tier a potom sa nám automaticky ponúkne inštancia triedy t2.micro - hoci je slabá, je bezplatná a celkom vhodná pre našu úlohu:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Ďalej prichádzajú veľmi dôležité veci: názov inštancie databázy, meno hlavného užívateľa a jeho heslo. Pomenujme inštanciu: myHabrTest, hlavný používateľ: habr, heslo: habr12345 a kliknite na tlačidlo Ďalej:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Na ďalšej stránke sú parametre zodpovedné za prístupnosť nášho databázového servera zvonku (Public accessibility) a dostupnosť portov:

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Vytvorme nové nastavenie pre bezpečnostnú skupinu VPC, ktorá umožní externý prístup k nášmu databázovému serveru cez port 5432 (PostgreSQL).
Poďme na konzolu AWS v samostatnom okne prehliadača do sekcie VPC Dashboard -> Bezpečnostné skupiny -> Vytvoriť bezpečnostnú skupinu:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Nastavili sme názov pre bezpečnostnú skupinu - PostgreSQL, popis, uveďte, ku ktorému VPC má byť táto skupina priradená a kliknite na tlačidlo Vytvoriť:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Vyplňte pravidlá vstupu pre port 5432 pre novovytvorenú skupinu, ako je znázornené na obrázku nižšie. Port nemôžete zadať ručne, ale vyberte PostgreSQL z rozbaľovacieho zoznamu Typ.

Presne povedané, hodnota ::/0 znamená dostupnosť prichádzajúcej prevádzky na server z celého sveta, čo kanonicky nie je úplne pravda, ale na analýzu príkladu si dovoľme použiť tento prístup:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Vrátime sa na stránku prehliadača, kde máme otvorené “Konfigurovať rozšírené nastavenia” a v sekcii VPC bezpečnostné skupiny vyberte -> Vybrať existujúce bezpečnostné skupiny VPC -> PostgreSQL:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Ďalej v možnostiach databázy -> Názov databázy -> nastavte názov - habrDB.

Zvyšné parametre, s výnimkou zakázania zálohovania (doba uchovávania zálohy – 0 dní), monitorovania a Performance Insights, môžeme ponechať štandardne. Kliknite na tlačidlo Vytvorte databázu:
Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Manipulátor závitov

Poslednou fázou bude vývoj úlohy Spark, ktorá každé dve sekundy spracuje nové dáta prichádzajúce od Kafku a výsledok vloží do databázy.

Ako je uvedené vyššie, kontrolné body sú základným mechanizmom v SparkStreaming, ktorý musí byť nakonfigurovaný na zabezpečenie odolnosti voči chybám. Použijeme kontrolné body a ak postup zlyhá, modul Spark Streaming sa bude musieť vrátiť iba k poslednému kontrolnému bodu a obnoviť z neho výpočty na obnovenie stratených údajov.

Kontrolný bod je možné povoliť nastavením adresára na spoľahlivom súborovom systéme odolnom voči chybám (napríklad HDFS, S3 atď.), v ktorom budú uložené informácie o kontrolnom bode. Robí sa to napríklad pomocou:

streamingContext.checkpoint(checkpointDirectory)

V našom príklade použijeme nasledujúci prístup, menovite, ak existuje checkpointDirectory, kontext sa znovu vytvorí z údajov kontrolného bodu. Ak adresár neexistuje (t. j. je spustený prvýkrát), potom sa zavolá funkcia functionToCreateContext na vytvorenie nového kontextu a konfiguráciu DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Vytvoríme objekt DirectStream na pripojenie k téme „transakcia“ pomocou metódy createDirectStream knižnice KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analýza prichádzajúcich údajov vo formáte JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Pomocou Spark SQL urobíme jednoduché zoskupenie a výsledok zobrazíme v konzole:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Získanie textu dotazu a jeho spustenie cez Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

A potom výsledné agregované údaje uložíme do tabuľky v AWS RDS. Na uloženie výsledkov agregácie do databázovej tabuľky použijeme metódu zápisu objektu DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Niekoľko slov o nastavení pripojenia k AWS RDS. Používateľa a heslo sme preň vytvorili v kroku „Nasadenie AWS PostgreSQL“. Koncový bod by ste mali použiť ako adresu URL databázového servera, ktorá sa zobrazuje v časti Pripojenie a zabezpečenie:

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Aby ste správne prepojili Spark a Kafku, mali by ste spustiť úlohu cez smark-submit pomocou artefaktu spark-streaming-kafka-0-8_2.11. Okrem toho použijeme aj artefakt na interakciu s databázou PostgreSQL, prenesieme ich cez --packages.

Pre flexibilitu skriptu zahrnieme ako vstupné parametre aj názov servera správ a tému, z ktorej chceme prijímať dáta.

Je teda čas spustiť a skontrolovať funkčnosť systému:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Všetko vyšlo! Ako môžete vidieť na obrázku nižšie, keď je aplikácia spustená, nové výsledky agregácie sa generujú každé 2 sekundy, pretože pri vytváraní objektu StreamingContext sme nastavili interval dávkovania na 2 sekundy:

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Ďalej urobíme jednoduchý dotaz do databázy na kontrolu prítomnosti záznamov v tabuľke tok_transakcií:

Apache Kafka a spracovanie streamovaných dát pomocou Spark Streaming

Záver

Tento článok sa zameral na príklad streamového spracovania informácií pomocou Spark Streaming v spojení s Apache Kafka a PostgreSQL. S nárastom údajov z rôznych zdrojov je ťažké preceňovať praktickú hodnotu Spark Streaming pre vytváranie streamingu a aplikácií v reálnom čase.

Úplný zdrojový kód nájdete v mojom úložisku na adrese GitHub.

Rád diskutujem o tomto článku, teším sa na vaše komentáre a dúfam aj v konštruktívnu kritiku od všetkých starostlivých čitateľov.

Prajem vám úspech!

Ps. Pôvodne bolo plánované použitie lokálnej databázy PostgreSQL, ale vzhľadom na moju lásku k AWS som sa rozhodol presunúť databázu do cloudu. V ďalšom článku na túto tému ukážem, ako implementovať celý vyššie popísaný systém v AWS pomocou AWS Kinesis a AWS EMR. Sledujte novinky!

Zdroj: hab.com

Pridať komentár