Apache Kafka a streamování dat pomocí Spark Streaming

Čau Habr! Dnes postavíme systém, který bude zpracovávat streamy zpráv Apache Kafka pomocí Spark Streaming a zapíše výsledek zpracování do cloudové databáze AWS RDS.

Představme si, že určitá úvěrová instituce před nás postaví úkol zpracovávat příchozí transakce „za chodu“ pro všechny své pobočky. To lze provést za účelem rychlého výpočtu otevřené měnové pozice pro státní pokladnu, limity nebo finanční výsledek transakcí atd.

Jak realizovat tento případ bez použití magie a kouzel - přečtěte si pod řezem! Jít!

Apache Kafka a streamování dat pomocí Spark Streaming
(zdroj obrázku)

úvod

Zpracování velkého množství dat v reálném čase samozřejmě poskytuje široké možnosti využití v moderních systémech. Jednou z nejoblíbenějších kombinací je tandem Apache Kafka a Spark Streaming, kde Kafka vytváří proud příchozích zpráv a Spark Streaming tyto pakety zpracovává v určeném časovém intervalu.

Pro zlepšení odolnosti aplikace proti chybám použijeme kontrolní body – kontrolní body. S tímto mechanismem, když modul Spark Streaming potřebuje obnovit ztracená data, stačí se vrátit k poslednímu kontrolnímu bodu a odtud obnovit výpočty.

Architektura vyvíjeného systému

Apache Kafka a streamování dat pomocí Spark Streaming

Použité komponenty:

  • Apache Kafka je distribuovaný systém zasílání zpráv publikováním a odběrem. Vhodné pro offline i online spotřebu zpráv. Aby se zabránilo ztrátě dat, jsou zprávy Kafka uloženy na disku a replikovány v rámci clusteru. Systém Kafka je postaven na synchronizační službě ZooKeeper;
  • Streamování Apache Spark - komponenta Spark pro zpracování streamovaných dat. Modul Spark Streaming je postaven pomocí mikrodávkové architektury, kdy je datový tok interpretován jako souvislá sekvence malých datových paketů. Spark Streaming bere data z různých zdrojů a kombinuje je do malých dávek. Nové balíčky jsou vytvářeny v pravidelných intervalech. Na začátku každého časového intervalu je vytvořen nový paket a všechna data přijatá během tohoto intervalu jsou zahrnuta do paketu. Na konci intervalu se růst paketů zastaví. Velikost intervalu je určena parametrem nazývaným dávkový interval;
  • Apache Spark SQL - Kombinuje relační zpracování s funkčním programováním Spark. Strukturovaná data se týkají dat, která mají schéma, tj. jednu sadu polí pro všechny záznamy. Spark SQL podporuje vstup z různých zdrojů strukturovaných dat a díky přítomnosti informací o schématu dokáže efektivně načíst pouze požadovaná pole záznamů a také poskytuje rozhraní API DataFrame;
  • AWS RDS je relativně levná cloudová relační databáze, webová služba, která zjednodušuje nastavení, provoz a škálování, spravovaná přímo Amazonem.

Instalace a spuštění serveru Kafka

Před přímým použitím Kafky se musíte ujistit, že máte Javu, protože JVM se používá pro práci:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Vytvořme nového uživatele pro práci s Kafkou:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Dále si stáhněte distribuční sadu z oficiálního webu Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Rozbalte stažený archiv:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Další krok je volitelný. Faktem je, že výchozí nastavení neumožňuje plně využívat všechny funkce Apache Kafka. Například smazat téma, kategorii, skupinu, na které lze publikovat zprávy. Chcete-li to změnit, upravte konfigurační soubor:

vim ~/kafka/config/server.properties

Na konec souboru přidejte následující:

delete.topic.enable = true

Před spuštěním serveru Kafka je třeba spustit server ZooKeeper, použijeme pomocný skript, který je součástí distribuce Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Po úspěšném spuštění ZooKeeperu spustíme server Kafka v samostatném terminálu:

bin/kafka-server-start.sh config/server.properties

Pojďme vytvořit nové téma s názvem Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ujistíme se, že bylo vytvořeno téma s požadovaným počtem oddílů a replikací:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka a streamování dat pomocí Spark Streaming

Promeškejme chvíle testování výrobce a spotřebitele pro nově vzniklé téma. Další podrobnosti o tom, jak můžete otestovat odesílání a přijímání zpráv, jsou uvedeny v oficiální dokumentaci - Pošlete nějaké zprávy. No, přecházíme k psaní producenta v Pythonu pomocí KafkaProducer API.

Producentské psaní

Výrobce vygeneruje náhodná data - 100 zpráv každou sekundu. Náhodnými daty rozumíme slovník sestávající ze tří polí:

  • Větev — název prodejního místa úvěrové instituce;
  • Měna — měna transakce;
  • Množství - částka transakce. Částka bude kladná, pokud se jedná o nákup měny bankou, a záporná, pokud se jedná o prodej.

Kód výrobce vypadá takto:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Dále pomocí metody send odešleme zprávu na server na téma, které potřebujeme, ve formátu JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Při spuštění skriptu dostáváme v terminálu následující zprávy:

Apache Kafka a streamování dat pomocí Spark Streaming

To znamená, že vše funguje, jak jsme chtěli – producent generuje a posílá zprávy k tématu, které potřebujeme.
Dalším krokem je instalace Sparku a zpracování tohoto toku zpráv.

Instalace Apache Spark

Apache Spark je všestranná a vysoce výkonná clusterová výpočetní platforma.

Spark překonává oblíbené implementace modelu MapReduce z hlediska výkonu a zároveň poskytuje podporu pro širší škálu typů výpočtů, včetně interaktivních dotazů a streamování. Rychlost hraje důležitou roli při zpracování velkého množství dat, protože právě rychlost vám umožňuje pracovat interaktivně, aniž byste museli trávit minuty nebo hodiny čekáním. Jednou z největších předností Sparku pro poskytování této rychlosti je jeho schopnost provádět výpočty v paměti.

Tento framework je napsán v Scala, takže jej musíte nejprve nainstalovat:

sudo apt-get install scala

Stáhněte si distribuci Spark z oficiálních stránek:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Rozbalte archiv:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Přidejte cestu k Sparku do bash souboru:

vim ~/.bashrc

Přidejte následující řádky pomocí editoru:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Po provedení změn v bashrc spusťte níže uvedený příkaz:

source ~/.bashrc

Nasazení AWS PostgreSQL

Zbývá rozšířit databázi, kam doplníme zpracované informace ze streamů. K tomu nám poslouží služba AWS RDS.

Přejděte do konzoly AWS -> AWS RDS -> Databáze -> Vytvořit databázi:
Apache Kafka a streamování dat pomocí Spark Streaming

Vyberte PostgreSQL a klikněte na tlačítko Další:
Apache Kafka a streamování dat pomocí Spark Streaming

Protože tento příklad je analyzován výhradně pro vzdělávací účely, použijeme bezplatný server „minimálně“ (Free Tier):
Apache Kafka a streamování dat pomocí Spark Streaming

Dále zaškrtněte políčko Free Tier a poté nám bude automaticky nabídnuta instance třídy t2.micro - sice slabá, ale bezplatná a pro náš úkol docela vhodná:
Apache Kafka a streamování dat pomocí Spark Streaming

Následují velmi důležité věci: název instance DB, jméno hlavního uživatele a jeho heslo. Zavolejte instanci: myHabrTest, hlavní uživatel: habr, Heslo: habr12345 a klikněte na tlačítko Další:
Apache Kafka a streamování dat pomocí Spark Streaming

Další stránka obsahuje parametry zodpovědné za přístupnost našeho databázového serveru zvenčí (Public accessibility) a dostupnost portů:

Apache Kafka a streamování dat pomocí Spark Streaming

Vytvořme nové nastavení pro bezpečnostní skupinu VPC, které umožní externí přístup k našemu databázovému serveru přes port 5432 (PostgreSQL).
Pojďme v samostatném okně prohlížeče do konzole AWS v části VPC Dashboard -> Skupiny zabezpečení -> Vytvořit skupinu zabezpečení:
Apache Kafka a streamování dat pomocí Spark Streaming

Nastavíme název pro bezpečnostní skupinu - PostgreSQL, popis, určíme, ke kterému VPC má být tato skupina přidružena a klikneme na tlačítko Vytvořit:
Apache Kafka a streamování dat pomocí Spark Streaming

Vyplňujeme pro nově vytvořenou skupinu Příchozí pravidla pro port 5432, jak je znázorněno na obrázku níže. Port nemůžete zadat ručně, ale vyberte PostgreSQL z rozevíracího seznamu Typ.

Přísně vzato, hodnota ::/0 znamená dostupnost příchozího provozu pro server z celého světa, což není kanonicky tak docela pravda, ale pro analýzu příkladu použijme tento přístup:
Apache Kafka a streamování dat pomocí Spark Streaming

Vrátíme se na stránku prohlížeče, kde máme otevřeno „Konfigurovat pokročilá nastavení“ a v sekci vybereme skupiny zabezpečení VPC -> Vybrat existující skupiny zabezpečení VPC -> PostgreSQL:
Apache Kafka a streamování dat pomocí Spark Streaming

Dále v sekci Možnosti databáze -> Název databáze -> nastavte název - habrDB.

Zbytek parametrů, s výjimkou deaktivace zálohování (doba uchování zálohy - 0 dní), sledování a Performance Insights, můžeme ponechat ve výchozím nastavení. Klikněte na tlačítko Vytvořte databázi:
Apache Kafka a streamování dat pomocí Spark Streaming

Obsluha streamu

Poslední fází bude vývoj úlohy Spark, která každé dvě sekundy zpracuje nová data od Kafky a výsledek zanese do databáze.

Jak je uvedeno výše, kontrolní body jsou hlavním mechanismem ve SparkStreaming, který musí být nakonfigurován tak, aby poskytoval odolnost proti chybám. Použijeme kontrolní body a v případě selhání procedury se modulu Spark Streaming stačí vrátit k poslednímu kontrolnímu bodu a obnovit z něj výpočty, aby obnovil ztracená data.

Kontrolní bod lze aktivovat nastavením adresáře na spolehlivém souborovém systému odolném proti chybám (např. HDFS, S3 atd.), kde budou uloženy informace o kontrolním bodu. To se provádí například:

streamingContext.checkpoint(checkpointDirectory)

V našem příkladu použijeme následující přístup, jmenovitě, pokud existuje adresář kontrolního bodu, pak bude kontext znovu vytvořen z dat kontrolního bodu. Pokud adresář neexistuje (tj. je spouštěn poprvé), zavolá se funkce functionToCreateContext k vytvoření nového kontextu a nastavení DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Vytvoříme objekt DirectStream pro připojení k tématu „transakční“ pomocí metody createDirectStream knihovny KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analýza příchozích dat ve formátu JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Pomocí Spark SQL provedeme jednoduché seskupení a výstup výsledku do konzole:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Získání těla dotazu a jeho spuštění prostřednictvím Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

A následně přijatá agregovaná data uložíme do tabulky v AWS RDS. Pro uložení výsledků agregace do databázové tabulky použijeme metodu write objektu DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Pár slov o nastavení připojení k AWS RDS. Vytvořili jsme pro něj uživatele a heslo v kroku „Nasazení AWS PostgreSQL“. Jako adresu URL databázového serveru byste měli použít Koncový bod, který je zobrazen v části Připojení a zabezpečení:

Apache Kafka a streamování dat pomocí Spark Streaming

Abyste správně propojili Spark a Kafku, měli byste úlohu spustit prostřednictvím smark-submit pomocí artefaktu spark-streaming-kafka-0-8_2.11. Kromě toho také použijeme artefakt pro interakci s databází PostgreSQL, předáme je přes --packages.

Pro flexibilitu skriptu vyjmeme jako vstupní parametry také název serveru zpráv a téma, ze kterého chceme data přijímat.

Je tedy čas spustit a otestovat systém:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Všechno se povedlo! Jak můžete vidět na obrázku níže, když je aplikace spuštěna, nové výsledky agregace se zobrazují každé 2 sekundy, protože při vytváření objektu StreamingContext jsme nastavili interval sdružování na 2 sekundy:

Apache Kafka a streamování dat pomocí Spark Streaming

Dále provedeme jednoduchý dotaz do databáze, abychom zkontrolovali záznamy v tabulce transakce_tok:

Apache Kafka a streamování dat pomocí Spark Streaming

Závěr

V tomto článku byl zvažován příklad zpracování streamovaných informací pomocí Spark Streaming ve spojení s Apache Kafka a PostgreSQL. S růstem objemů dat z různých zdrojů je obtížné přeceňovat praktickou hodnotu Spark Streaming pro vytváření aplikací v reálném čase a streamování.

Celý zdrojový kód najdete v mém úložišti na adrese GitHub.

Rád tento článek prodiskutuji, těším se na vaše komentáře a také doufám v konstruktivní kritiku od všech dotčených čtenářů.

Přeji vám úspěch!

Ps. Původně bylo plánováno použití lokální PostgreSQL databáze, ale vzhledem k mé lásce k AWS jsem se rozhodl databázi přesunout do cloudu. V příštím článku na toto téma vám ukážu, jak celý výše popsaný systém implementovat do AWS pomocí AWS Kinesis a AWS EMR. Sledujte novinky!

Zdroj: www.habr.com

Přidat komentář