Apache Kafka och Streaming Data Processing med Spark Streaming
Hej, Habr! Idag kommer vi att bygga ett system som kommer att behandla Apache Kafka-meddelandeströmmar med hjälp av Spark Streaming och skriva bearbetningsresultaten till AWS RDS molndatabas.
Låt oss föreställa oss att ett visst kreditinstitut ger oss uppgiften att behandla inkommande transaktioner "i farten" över alla sina filialer. Detta kan göras i syfte att snabbt beräkna en öppen valutaposition för statskassan, limiter eller finansiella resultat för transaktioner etc.
Hur man implementerar detta fall utan användning av magi och magiska trollformler - läs under klippet! Gå!
Att bearbeta en stor mängd data i realtid ger givetvis stora möjligheter att använda i moderna system. En av de mest populära kombinationerna för detta är tandem av Apache Kafka och Spark Streaming, där Kafka skapar en ström av inkommande meddelandepaket, och Spark Streaming bearbetar dessa paket vid ett givet tidsintervall.
För att öka applikationens feltolerans kommer vi att använda kontrollpunkter. Med denna mekanism, när Spark Streaming-motorn behöver återställa förlorad data, behöver den bara gå tillbaka till den sista kontrollpunkten och återuppta beräkningarna därifrån.
Det utvecklade systemets arkitektur
Komponenter som används:
Apache Kafka är ett distribuerat meddelandesystem för publicering och prenumeration. Lämplig för både offline- och online-meddelandekonsumtion. För att förhindra dataförlust lagras Kafka-meddelanden på disk och replikeras i klustret. Kafka-systemet är byggt ovanpå ZooKeepers synkroniseringstjänst;
Apache Spark Streaming - Spark-komponent för bearbetning av strömmande data. Spark Streaming-modulen är byggd med en mikrobatcharkitektur, där dataströmmen tolkas som en kontinuerlig sekvens av små datapaket. Spark Streaming tar data från olika källor och kombinerar det till små paket. Nya paket skapas med jämna mellanrum. I början av varje tidsintervall skapas ett nytt paket och all data som tas emot under det intervallet inkluderas i paketet. I slutet av intervallet upphör pakettillväxten. Storleken på intervallet bestäms av en parameter som kallas batchintervall;
Apache Spark SQL - kombinerar relationsbearbetning med Spark funktionell programmering. Strukturerad data betyder data som har ett schema, det vill säga en enda uppsättning fält för alla poster. Spark SQL stöder input från en mängd olika strukturerade datakällor och tack vare tillgången på schemainformation kan den effektivt hämta endast de obligatoriska fälten med poster, och tillhandahåller även DataFrame API:er;
AWS RDS är en relativt billig molnbaserad relationsdatabas, webbtjänst som förenklar installation, drift och skalning, och administreras direkt av Amazon.
Installera och köra Kafka-servern
Innan du använder Kafka direkt måste du se till att du har Java, eftersom... JVM används för arbete:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Nästa steg är valfritt. Faktum är att standardinställningarna inte tillåter dig att fullt ut använda alla funktioner i Apache Kafka. Ta till exempel bort ett ämne, kategori, grupp som meddelanden kan publiceras till. För att ändra detta, låt oss redigera konfigurationsfilen:
vim ~/kafka/config/server.properties
Lägg till följande i slutet av filen:
delete.topic.enable = true
Innan du startar Kafka-servern måste du starta ZooKeeper-servern; vi kommer att använda hjälpskriptet som följer med Kafka-distributionen:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
När ZooKeeper har startat framgångsrikt, starta Kafka-servern i en separat terminal:
Låt oss missa ögonblicken att testa producenten och konsumenten för det nyskapade ämnet. Mer information om hur du kan testa att skicka och ta emot meddelanden finns i den officiella dokumentationen - Skicka några meddelanden. Tja, vi går vidare till att skriva en producent i Python med KafkaProducer API.
Producent skriver
Producenten kommer att generera slumpmässiga data - 100 meddelanden varje sekund. Med slumpmässiga data menar vi en ordbok som består av tre fält:
Branch — Namnet på kreditinstitutets försäljningsställe.
Valuta — Transaktionsvaluta.
Antal - Transaktionsbelopp. Beloppet kommer att vara ett positivt tal om det är ett köp av valuta av banken och ett negativt tal om det är en försäljning.
Därefter, med hjälp av sändmetoden, skickar vi ett meddelande till servern, till det ämne vi behöver, i JSON-format:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
När vi kör skriptet får vi följande meddelanden i terminalen:
Det betyder att allt fungerar som vi ville – producenten genererar och skickar meddelanden till det ämne vi behöver.
Nästa steg är att installera Spark och bearbeta denna meddelandeström.
Installerar Apache Spark
Apache Spark är en universell och högpresterande klusterdatorplattform.
Spark presterar bättre än populära implementeringar av MapReduce-modellen samtidigt som de stöder ett bredare utbud av beräkningstyper, inklusive interaktiva frågor och strömbehandling. Hastighet spelar en viktig roll vid bearbetning av stora datamängder, eftersom det är hastigheten som gör att du kan arbeta interaktivt utan att behöva vänta några minuter eller timmar. En av Sparks största styrkor som gör den så snabb är dess förmåga att utföra beräkningar i minnet.
Detta ramverk är skrivet i Scala, så du måste installera det först:
sudo apt-get install scala
Ladda ner Spark-distributionen från den officiella webbplatsen:
Kör kommandot nedan efter att ha gjort ändringar i bashrc:
source ~/.bashrc
Distribuera AWS PostgreSQL
Allt som återstår är att distribuera databasen till vilken vi laddar upp den bearbetade informationen från strömmarna. För detta kommer vi att använda AWS RDS-tjänsten.
Gå till AWS-konsolen -> AWS RDS -> Databaser -> Skapa databas:
Välj PostgreSQL och klicka på Nästa:
Därför att Det här exemplet är endast för utbildningsändamål; vi kommer att använda en gratis server "minst" (gratis nivå):
Därefter sätter vi en bock i Free Tier-blocket, och efter det kommer vi automatiskt att erbjudas en instans av klassen t2.micro - även om den är svag, är den gratis och ganska lämplig för vår uppgift:
Därefter kommer mycket viktiga saker: namnet på databasinstansen, namnet på huvudanvändaren och hans lösenord. Låt oss namnge instansen: myHabrTest, master user: habr, Lösenord: habr12345 och klicka på knappen Nästa:
På nästa sida finns parametrar som ansvarar för tillgängligheten till vår databasserver utifrån (Public accessibility) och porttillgänglighet:
Låt oss skapa en ny inställning för VPC-säkerhetsgruppen, som tillåter extern åtkomst till vår databasserver via port 5432 (PostgreSQL).
Låt oss gå till AWS-konsolen i ett separat webbläsarfönster till VPC Dashboard -> Säkerhetsgrupper -> Skapa säkerhetsgruppsektion:
Vi anger namnet för säkerhetsgruppen - PostgreSQL, en beskrivning, anger vilken VPC denna grupp ska associeras med och klickar på knappen Skapa:
Fyll i Inkommande regler för port 5432 för den nyskapade gruppen, som visas på bilden nedan. Du kan inte ange porten manuellt, men välj PostgreSQL från rullgardinsmenyn Typ.
Strängt taget betyder värdet ::/0 tillgängligheten av inkommande trafik till servern från hela världen, vilket kanoniskt inte är helt sant, men för att analysera exemplet, låt oss tillåta oss att använda detta tillvägagångssätt:
Vi återvänder till webbläsarsidan, där vi har "Konfigurera avancerade inställningar" öppen och väljer i avsnittet VPC-säkerhetsgrupper -> Välj befintliga VPC-säkerhetsgrupper -> PostgreSQL:
Därefter, i Databasalternativ -> Databasnamn -> ställ in namnet - habrDB.
Vi kan lämna de återstående parametrarna, med undantag för att inaktivera säkerhetskopiering (retentionstid för säkerhetskopiering - 0 dagar), övervakning och prestandainsikter, som standard. Klicka på knappen Skapa databas:
Trådhanterare
Det sista steget kommer att vara utvecklingen av ett Spark-jobb, som kommer att bearbeta ny data som kommer från Kafka varannan sekund och lägga in resultatet i databasen.
Som nämnts ovan är kontrollpunkter en kärnmekanism i SparkStreaming som måste konfigureras för att säkerställa feltolerans. Vi kommer att använda kontrollpunkter och, om proceduren misslyckas, behöver Spark Streaming-modulen bara återgå till den sista kontrollpunkten och återuppta beräkningar från den för att återställa förlorad data.
Checkpointing kan aktiveras genom att ställa in en katalog på ett feltolerant, pålitligt filsystem (som HDFS, S3, etc.) där checkpointinformationen kommer att lagras. Detta görs med till exempel:
streamingContext.checkpoint(checkpointDirectory)
I vårt exempel kommer vi att använda följande tillvägagångssätt, nämligen, om checkpointDirectory finns, kommer kontexten att återskapas från kontrollpunktsdata. Om katalogen inte finns (dvs exekveras för första gången), anropas functionToCreateContext för att skapa en ny kontext och konfigurera DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Vi skapar ett DirectStream-objekt för att ansluta till "transaktions"-ämnet med metoden createDirectStream i KafkaUtils-biblioteket:
Med Spark SQL gör vi en enkel gruppering och visar resultatet i konsolen:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Och sedan sparar vi den resulterande aggregerade datan i en tabell i AWS RDS. För att spara aggregeringsresultaten till en databastabell kommer vi att använda skrivmetoden för DataFrame-objektet:
Några ord om att sätta upp en anslutning till AWS RDS. Vi skapade användaren och lösenordet för det i steget "Deploying AWS PostgreSQL". Du bör använda Endpoint som databasserverns webbadress, som visas i avsnittet Anslutningar och säkerhet:
För att korrekt koppla ihop Spark och Kafka bör du köra jobbet via smark-submit med hjälp av artefakten spark-streaming-kafka-0-8_2.11. Dessutom kommer vi också att använda en artefakt för att interagera med PostgreSQL-databasen; vi kommer att överföra dem via --paket.
För flexibiliteten för skriptet kommer vi också att inkludera som indataparametrar namnet på meddelandeservern och ämnet från vilket vi vill ta emot data.
Så det är dags att starta och kontrollera systemets funktionalitet:
Allt löste sig! Som du kan se på bilden nedan, medan applikationen körs, matas nya aggregeringsresultat ut varannan sekund, eftersom vi ställde in batchintervallet till 2 sekunder när vi skapade StreamingContext-objektet:
Därefter gör vi en enkel fråga till databasen för att kontrollera närvaron av poster i tabellen transaktionsflöde:
Slutsats
Den här artikeln tittade på ett exempel på strömbehandling av information med Spark Streaming i kombination med Apache Kafka och PostgreSQL. Med tillväxten av data från olika källor är det svårt att överskatta det praktiska värdet av Spark Streaming för att skapa streaming- och realtidsapplikationer.
Du kan hitta hela källkoden i mitt arkiv på GitHub.
Jag diskuterar gärna den här artikeln, jag ser fram emot dina kommentarer, och jag hoppas också på konstruktiv kritik från alla omtänksamma läsare.
Jag önskar dig framgång!
Ps. Från början var det planerat att använda en lokal PostgreSQL-databas, men med tanke på min kärlek till AWS bestämde jag mig för att flytta databasen till molnet. I nästa artikel om detta ämne kommer jag att visa hur man implementerar hela systemet som beskrivs ovan i AWS med hjälp av AWS Kinesis och AWS EMR. Följ nyheterna!