Apache Kafka och Streaming Data Processing med Spark Streaming

Hej, Habr! Idag kommer vi att bygga ett system som kommer att behandla Apache Kafka-meddelandeströmmar med hjälp av Spark Streaming och skriva bearbetningsresultaten till AWS RDS molndatabas.

Låt oss föreställa oss att ett visst kreditinstitut ger oss uppgiften att behandla inkommande transaktioner "i farten" över alla sina filialer. Detta kan göras i syfte att snabbt beräkna en öppen valutaposition för statskassan, limiter eller finansiella resultat för transaktioner etc.

Hur man implementerar detta fall utan användning av magi och magiska trollformler - läs under klippet! Gå!

Apache Kafka och Streaming Data Processing med Spark Streaming
(Bildkälla)

Inledning

Att bearbeta en stor mängd data i realtid ger givetvis stora möjligheter att använda i moderna system. En av de mest populära kombinationerna för detta är tandem av Apache Kafka och Spark Streaming, där Kafka skapar en ström av inkommande meddelandepaket, och Spark Streaming bearbetar dessa paket vid ett givet tidsintervall.

För att öka applikationens feltolerans kommer vi att använda kontrollpunkter. Med denna mekanism, när Spark Streaming-motorn behöver återställa förlorad data, behöver den bara gå tillbaka till den sista kontrollpunkten och återuppta beräkningarna därifrån.

Det utvecklade systemets arkitektur

Apache Kafka och Streaming Data Processing med Spark Streaming

Komponenter som används:

  • Apache Kafka är ett distribuerat meddelandesystem för publicering och prenumeration. Lämplig för både offline- och online-meddelandekonsumtion. För att förhindra dataförlust lagras Kafka-meddelanden på disk och replikeras i klustret. Kafka-systemet är byggt ovanpå ZooKeepers synkroniseringstjänst;
  • Apache Spark Streaming - Spark-komponent för bearbetning av strömmande data. Spark Streaming-modulen är byggd med en mikrobatcharkitektur, där dataströmmen tolkas som en kontinuerlig sekvens av små datapaket. Spark Streaming tar data från olika källor och kombinerar det till små paket. Nya paket skapas med jämna mellanrum. I början av varje tidsintervall skapas ett nytt paket och all data som tas emot under det intervallet inkluderas i paketet. I slutet av intervallet upphör pakettillväxten. Storleken på intervallet bestäms av en parameter som kallas batchintervall;
  • Apache Spark SQL - kombinerar relationsbearbetning med Spark funktionell programmering. Strukturerad data betyder data som har ett schema, det vill säga en enda uppsättning fält för alla poster. Spark SQL stöder input från en mängd olika strukturerade datakällor och tack vare tillgången på schemainformation kan den effektivt hämta endast de obligatoriska fälten med poster, och tillhandahåller även DataFrame API:er;
  • AWS RDS är en relativt billig molnbaserad relationsdatabas, webbtjänst som förenklar installation, drift och skalning, och administreras direkt av Amazon.

Installera och köra Kafka-servern

Innan du använder Kafka direkt måste du se till att du har Java, eftersom... JVM används för arbete:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Låt oss skapa en ny användare att arbeta med Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ladda sedan ner distributionen från den officiella Apache Kafka-webbplatsen:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Packa upp det nedladdade arkivet:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Nästa steg är valfritt. Faktum är att standardinställningarna inte tillåter dig att fullt ut använda alla funktioner i Apache Kafka. Ta till exempel bort ett ämne, kategori, grupp som meddelanden kan publiceras till. För att ändra detta, låt oss redigera konfigurationsfilen:

vim ~/kafka/config/server.properties

Lägg till följande i slutet av filen:

delete.topic.enable = true

Innan du startar Kafka-servern måste du starta ZooKeeper-servern; vi kommer att använda hjälpskriptet som följer med Kafka-distributionen:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

När ZooKeeper har startat framgångsrikt, starta Kafka-servern i en separat terminal:

bin/kafka-server-start.sh config/server.properties

Låt oss skapa ett nytt ämne som heter Transaktion:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Låt oss se till att ett ämne med det nödvändiga antalet partitioner och replikering har skapats:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka och Streaming Data Processing med Spark Streaming

Låt oss missa ögonblicken att testa producenten och konsumenten för det nyskapade ämnet. Mer information om hur du kan testa att skicka och ta emot meddelanden finns i den officiella dokumentationen - Skicka några meddelanden. Tja, vi går vidare till att skriva en producent i Python med KafkaProducer API.

Producent skriver

Producenten kommer att generera slumpmässiga data - 100 meddelanden varje sekund. Med slumpmässiga data menar vi en ordbok som består av tre fält:

  • Branch — Namnet på kreditinstitutets försäljningsställe.
  • Valuta — Transaktionsvaluta.
  • Antal - Transaktionsbelopp. Beloppet kommer att vara ett positivt tal om det är ett köp av valuta av banken och ett negativt tal om det är en försäljning.

Koden för producenten ser ut så här:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Därefter, med hjälp av sändmetoden, skickar vi ett meddelande till servern, till det ämne vi behöver, i JSON-format:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

När vi kör skriptet får vi följande meddelanden i terminalen:

Apache Kafka och Streaming Data Processing med Spark Streaming

Det betyder att allt fungerar som vi ville – producenten genererar och skickar meddelanden till det ämne vi behöver.
Nästa steg är att installera Spark och bearbeta denna meddelandeström.

Installerar Apache Spark

Apache Spark är en universell och högpresterande klusterdatorplattform.

Spark presterar bättre än populära implementeringar av MapReduce-modellen samtidigt som de stöder ett bredare utbud av beräkningstyper, inklusive interaktiva frågor och strömbehandling. Hastighet spelar en viktig roll vid bearbetning av stora datamängder, eftersom det är hastigheten som gör att du kan arbeta interaktivt utan att behöva vänta några minuter eller timmar. En av Sparks största styrkor som gör den så snabb är dess förmåga att utföra beräkningar i minnet.

Detta ramverk är skrivet i Scala, så du måste installera det först:

sudo apt-get install scala

Ladda ner Spark-distributionen från den officiella webbplatsen:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Packa upp arkivet:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Lägg till sökvägen till Spark i bash-filen:

vim ~/.bashrc

Lägg till följande rader genom redigeraren:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Kör kommandot nedan efter att ha gjort ändringar i bashrc:

source ~/.bashrc

Distribuera AWS PostgreSQL

Allt som återstår är att distribuera databasen till vilken vi laddar upp den bearbetade informationen från strömmarna. För detta kommer vi att använda AWS RDS-tjänsten.

Gå till AWS-konsolen -> AWS RDS -> Databaser -> Skapa databas:
Apache Kafka och Streaming Data Processing med Spark Streaming

Välj PostgreSQL och klicka på Nästa:
Apache Kafka och Streaming Data Processing med Spark Streaming

Därför att Det här exemplet är endast för utbildningsändamål; vi kommer att använda en gratis server "minst" (gratis nivå):
Apache Kafka och Streaming Data Processing med Spark Streaming

Därefter sätter vi en bock i Free Tier-blocket, och efter det kommer vi automatiskt att erbjudas en instans av klassen t2.micro - även om den är svag, är den gratis och ganska lämplig för vår uppgift:
Apache Kafka och Streaming Data Processing med Spark Streaming

Därefter kommer mycket viktiga saker: namnet på databasinstansen, namnet på huvudanvändaren och hans lösenord. Låt oss namnge instansen: myHabrTest, master user: habr, Lösenord: habr12345 och klicka på knappen Nästa:
Apache Kafka och Streaming Data Processing med Spark Streaming

På nästa sida finns parametrar som ansvarar för tillgängligheten till vår databasserver utifrån (Public accessibility) och porttillgänglighet:

Apache Kafka och Streaming Data Processing med Spark Streaming

Låt oss skapa en ny inställning för VPC-säkerhetsgruppen, som tillåter extern åtkomst till vår databasserver via port 5432 (PostgreSQL).
Låt oss gå till AWS-konsolen i ett separat webbläsarfönster till VPC Dashboard -> Säkerhetsgrupper -> Skapa säkerhetsgruppsektion:
Apache Kafka och Streaming Data Processing med Spark Streaming

Vi anger namnet för säkerhetsgruppen - PostgreSQL, en beskrivning, anger vilken VPC denna grupp ska associeras med och klickar på knappen Skapa:
Apache Kafka och Streaming Data Processing med Spark Streaming

Fyll i Inkommande regler för port 5432 för den nyskapade gruppen, som visas på bilden nedan. Du kan inte ange porten manuellt, men välj PostgreSQL från rullgardinsmenyn Typ.

Strängt taget betyder värdet ::/0 tillgängligheten av inkommande trafik till servern från hela världen, vilket kanoniskt inte är helt sant, men för att analysera exemplet, låt oss tillåta oss att använda detta tillvägagångssätt:
Apache Kafka och Streaming Data Processing med Spark Streaming

Vi återvänder till webbläsarsidan, där vi har "Konfigurera avancerade inställningar" öppen och väljer i avsnittet VPC-säkerhetsgrupper -> Välj befintliga VPC-säkerhetsgrupper -> PostgreSQL:
Apache Kafka och Streaming Data Processing med Spark Streaming

Därefter, i Databasalternativ -> Databasnamn -> ställ in namnet - habrDB.

Vi kan lämna de återstående parametrarna, med undantag för att inaktivera säkerhetskopiering (retentionstid för säkerhetskopiering - 0 dagar), övervakning och prestandainsikter, som standard. Klicka på knappen Skapa databas:
Apache Kafka och Streaming Data Processing med Spark Streaming

Trådhanterare

Det sista steget kommer att vara utvecklingen av ett Spark-jobb, som kommer att bearbeta ny data som kommer från Kafka varannan sekund och lägga in resultatet i databasen.

Som nämnts ovan är kontrollpunkter en kärnmekanism i SparkStreaming som måste konfigureras för att säkerställa feltolerans. Vi kommer att använda kontrollpunkter och, om proceduren misslyckas, behöver Spark Streaming-modulen bara återgå till den sista kontrollpunkten och återuppta beräkningar från den för att återställa förlorad data.

Checkpointing kan aktiveras genom att ställa in en katalog på ett feltolerant, pålitligt filsystem (som HDFS, S3, etc.) där checkpointinformationen kommer att lagras. Detta görs med till exempel:

streamingContext.checkpoint(checkpointDirectory)

I vårt exempel kommer vi att använda följande tillvägagångssätt, nämligen, om checkpointDirectory finns, kommer kontexten att återskapas från kontrollpunktsdata. Om katalogen inte finns (dvs exekveras för första gången), anropas functionToCreateContext för att skapa en ny kontext och konfigurera DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Vi skapar ett DirectStream-objekt för att ansluta till "transaktions"-ämnet med metoden createDirectStream i KafkaUtils-biblioteket:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsar inkommande data i JSON-format:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Med Spark SQL gör vi en enkel gruppering och visar resultatet i konsolen:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Hämta frågetexten och kör den genom Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Och sedan sparar vi den resulterande aggregerade datan i en tabell i AWS RDS. För att spara aggregeringsresultaten till en databastabell kommer vi att använda skrivmetoden för DataFrame-objektet:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Några ord om att sätta upp en anslutning till AWS RDS. Vi skapade användaren och lösenordet för det i steget "Deploying AWS PostgreSQL". Du bör använda Endpoint som databasserverns webbadress, som visas i avsnittet Anslutningar och säkerhet:

Apache Kafka och Streaming Data Processing med Spark Streaming

För att korrekt koppla ihop Spark och Kafka bör du köra jobbet via smark-submit med hjälp av artefakten spark-streaming-kafka-0-8_2.11. Dessutom kommer vi också att använda en artefakt för att interagera med PostgreSQL-databasen; vi kommer att överföra dem via --paket.

För flexibiliteten för skriptet kommer vi också att inkludera som indataparametrar namnet på meddelandeservern och ämnet från vilket vi vill ta emot data.

Så det är dags att starta och kontrollera systemets funktionalitet:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Allt löste sig! Som du kan se på bilden nedan, medan applikationen körs, matas nya aggregeringsresultat ut varannan sekund, eftersom vi ställde in batchintervallet till 2 sekunder när vi skapade StreamingContext-objektet:

Apache Kafka och Streaming Data Processing med Spark Streaming

Därefter gör vi en enkel fråga till databasen för att kontrollera närvaron av poster i tabellen transaktionsflöde:

Apache Kafka och Streaming Data Processing med Spark Streaming

Slutsats

Den här artikeln tittade på ett exempel på strömbehandling av information med Spark Streaming i kombination med Apache Kafka och PostgreSQL. Med tillväxten av data från olika källor är det svårt att överskatta det praktiska värdet av Spark Streaming för att skapa streaming- och realtidsapplikationer.

Du kan hitta hela källkoden i mitt arkiv på GitHub.

Jag diskuterar gärna den här artikeln, jag ser fram emot dina kommentarer, och jag hoppas också på konstruktiv kritik från alla omtänksamma läsare.

Jag önskar dig framgång!

Ps. Från början var det planerat att använda en lokal PostgreSQL-databas, men med tanke på min kärlek till AWS bestämde jag mig för att flytta databasen till molnet. I nästa artikel om detta ämne kommer jag att visa hur man implementerar hela systemet som beskrivs ovan i AWS med hjälp av AWS Kinesis och AWS EMR. Följ nyheterna!

Källa: will.com

Lägg en kommentar