Hej Habr! I dag vil vi bygge et system, der vil behandle Apache Kafka-meddelelsesstrømme ved hjælp af Spark Streaming og skrive behandlingsresultatet til AWS RDS-skydatabasen.
Lad os forestille os, at et bestemt kreditinstitut stiller os opgaven med at behandle indgående transaktioner "on the fly" for alle sine filialer. Dette kan ske med det formål hurtigt at beregne den åbne valutaposition for statskassen, limits eller økonomisk resultat på transaktioner mv.
Sådan implementerer du denne sag uden brug af magi og magiske besværgelser - læs under klippet! Gå!
Naturligvis giver realtidsbehandling af en stor mængde data rige muligheder for brug i moderne systemer. En af de mest populære kombinationer for dette er tandem af Apache Kafka og Spark Streaming, hvor Kafka opretter en strøm af indgående beskedpakker, og Spark Streaming behandler disse pakker med et bestemt tidsinterval.
For at forbedre applikationens fejltolerance vil vi bruge checkpoints - checkpoints. Med denne mekanisme, når Spark Streaming-modulet skal gendanne tabte data, behøver det kun at vende tilbage til det sidste kontrolpunkt og genoptage beregninger derfra.
Arkitekturen af det udviklede system
Brugte komponenter:
Apache Kafka er et distribueret udgiv-og-abonner-meddelelsessystem. Velegnet til både offline og online beskedforbrug. For at forhindre tab af data gemmes Kafka-meddelelser på disken og replikeres i klyngen. Kafka-systemet er bygget oven på ZooKeeper-synkroniseringstjenesten;
Apache Spark-streaming - en Spark-komponent til behandling af streamingdata. Spark Streaming-modulet er bygget ved hjælp af en mikro-batch-arkitektur, når en datastrøm fortolkes som en kontinuerlig sekvens af små datapakker. Spark Streaming tager data fra forskellige kilder og kombinerer dem i små batches. Nye pakker oprettes med jævne mellemrum. Ved starten af hvert tidsinterval oprettes en ny pakke, og alle data, der modtages i dette interval, er inkluderet i pakken. Ved slutningen af intervallet stopper pakkevæksten. Størrelsen af intervallet bestemmes af en parameter kaldet batchintervallet;
Apache Spark SQL - Kombinerer relationel behandling med Spark funktionel programmering. Strukturerede data refererer til data, der har et skema, det vil sige et enkelt sæt felter for alle poster. Spark SQL understøtter input fra en række strukturerede datakilder, og på grund af tilstedeværelsen af skemaoplysninger kan den effektivt kun udtrække de påkrævede felter af poster, og den leverer også DataFrame API'er;
AWS RDS er en relativt billig cloud-baseret relationsdatabase, en webservice, der forenkler opsætning, drift og skalering, administreret direkte af Amazon.
Installation og kørsel af Kafka-serveren
Før du bruger Kafka direkte, skal du sikre dig, at du har Java, pga JVM bruges til arbejde:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Det næste trin er valgfrit. Faktum er, at standardindstillingerne ikke tillader dig fuldt ud at bruge alle funktionerne i Apache Kafka. Slet for eksempel et emne, kategori, gruppe, som meddelelser kan publiceres på. For at ændre dette, lad os redigere konfigurationsfilen:
vim ~/kafka/config/server.properties
Tilføj følgende til slutningen af filen:
delete.topic.enable = true
Før du starter Kafka-serveren, skal du starte ZooKeeper-serveren, vi vil bruge hjælpescriptet, der følger med Kafka-distributionen:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Efter at ZooKeeper er startet, starter vi Kafka-serveren i en separat terminal:
Lad os gå glip af øjeblikke med at teste producenten og forbrugeren for det nyoprettede emne. Flere detaljer om, hvordan du kan teste at sende og modtage beskeder er skrevet i den officielle dokumentation - Send nogle beskeder. Nå, vi går videre til at skrive en producent i Python ved hjælp af KafkaProducer API.
Producer skriver
Producenten vil generere tilfældige data - 100 beskeder hvert sekund. Med tilfældige data mener vi en ordbog bestående af tre felter:
Branch — navnet på kreditinstituttets salgssted;
Valuta — transaktionsvaluta;
beløb - overførselsbeløb. Beløbet vil være positivt, hvis det er et køb af valuta fra banken, og negativt, hvis det er et salg.
Dernæst, ved hjælp af sendemetoden, sender vi en besked til serveren, til det emne, vi har brug for, i JSON-format:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Når vi kører scriptet, får vi følgende meddelelser i terminalen:
Det betyder, at alt fungerer, som vi ønskede – producenten genererer og sender beskeder til det emne, vi har brug for.
Det næste trin er at installere Spark og behandle dette meddelelsesflow.
Installation af Apache Spark
Apache Spark er en alsidig og højtydende klyngecomputerplatform.
Spark udkonkurrerer populære implementeringer af MapReduce-modellen med hensyn til ydeevne, samtidig med at den understøtter en bredere række af beregningstyper, herunder interaktive forespørgsler og streaming. Hastighed spiller en vigtig rolle ved behandling af store mængder data, da det er hastighed, der giver dig mulighed for at arbejde interaktivt uden at bruge minutter eller timer på at vente. En af Sparks største styrker til at levere denne hastighed er dens evne til at udføre in-memory beregninger.
Denne ramme er skrevet i Scala, så du skal installere den først:
sudo apt-get install scala
Download Spark-distributionen fra den officielle hjemmeside:
Kør kommandoen nedenfor efter at have foretaget ændringer til bashrc:
source ~/.bashrc
Implementering af AWS PostgreSQL
Det er tilbage at udvide databasen, hvor vi vil udfylde de behandlede oplysninger fra strømmene. For at gøre dette vil vi bruge AWS RDS-tjenesten.
Gå til AWS-konsollen -> AWS RDS -> Databaser -> Opret database:
Vælg PostgreSQL, og klik på knappen Næste:
Fordi dette eksempel analyseres udelukkende til uddannelsesformål, vi vil bruge en gratis server "som minimum" (Free Tier):
Tjek derefter Free Tier-blokken, og derefter vil vi automatisk blive tilbudt en forekomst af t2.micro-klassen - selvom det er svagt, men gratis og ganske velegnet til vores opgave:
Meget vigtige ting følger: navnet på DB-instansen, navnet på masterbrugeren og dennes adgangskode. Lad os kalde forekomsten: myHabrTest, hovedbruger: habr, adgangskode: habr12345 og klik på knappen Næste:
Den næste side indeholder de parametre, der er ansvarlige for tilgængeligheden af vores databaseserver udefra (Public accessibility) og tilgængeligheden af porte:
Lad os oprette en ny indstilling for VPC-sikkerhedsgruppen, som tillader ekstern adgang til vores databaseserver via port 5432 (PostgreSQL).
Lad os gå i et separat browservindue til AWS-konsollen i VPC Dashboard -> Sikkerhedsgrupper -> Opret sikkerhedsgruppesektion:
Vi sætter navnet på sikkerhedsgruppen - PostgreSQL, en beskrivelse, specificerer hvilken VPC denne gruppe skal være tilknyttet og klik på knappen Opret:
Vi udfylder den nyoprettede gruppe Indgående regler for port 5432, som vist på billedet nedenfor. Du kan ikke angive porten manuelt, men vælg PostgreSQL fra rullelisten Type.
Strengt taget betyder værdien ::/0 tilgængeligheden af indgående trafik til serveren fra hele verden, hvilket ikke er helt sandt kanonisk, men for at analysere eksemplet, lad os bruge denne tilgang:
Vi vender tilbage til browsersiden, hvor vi har "Konfigurer avancerede indstillinger" åben og vælger VPC-sikkerhedsgrupper -> Vælg eksisterende VPC-sikkerhedsgrupper -> PostgreSQL i sektionen:
Dernæst i afsnittet Databaseindstillinger -> Databasenavn -> indstil navnet - habrDB.
Vi kan lade resten af parametrene, med undtagelse af deaktivering af backup (backup-opbevaringsperiode - 0 dage), overvågning og Performance Insights, stå som standard. Klik på knappen Opret database:
Stream Handler
Den sidste fase vil være udviklingen af et Spark-job, som vil behandle nye data fra Kafka hvert andet sekund og indtaste resultatet i databasen.
Som nævnt ovenfor er kontrolpunkter hovedmekanismen i SparkStreaming, der skal konfigureres til at give fejltolerance. Vi vil bruge kontrolpunkter, og i tilfælde af en procedurefejl skal Spark Streaming-modulet kun vende tilbage til det sidste kontrolpunkt og genoptage beregninger fra det for at gendanne de tabte data.
Et kontrolpunkt kan aktiveres ved at indstille en mappe på et fejltolerant, pålideligt filsystem (f.eks. HDFS, S3 osv.), hvor kontrolpunktinformationen vil blive gemt. Dette gøres med for eksempel:
streamingContext.checkpoint(checkpointDirectory)
I vores eksempel vil vi bruge følgende tilgang, nemlig hvis checkpointDirectory eksisterer, så vil konteksten blive genskabt fra checkpointdataene. Hvis mappen ikke eksisterer (dvs. den udføres for første gang), så kaldes functionToCreateContext-funktionen for at skabe en ny kontekst og opsætte DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Vi opretter et DirectStream-objekt for at forbinde til "transaktion"-emnet ved at bruge createDirectStream-metoden i KafkaUtils-biblioteket:
Ved at bruge Spark SQL laver vi en simpel gruppering og sender resultatet til konsollen:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Hentning af forespørgselsteksten og kørsel gennem Spark SQL:
Og så gemmer vi de modtagne aggregerede data i en tabel i AWS RDS. For at gemme resultaterne af aggregeringen til en databasetabel, vil vi bruge skrivemetoden for DataFrame-objektet:
Et par ord om at oprette en forbindelse til AWS RDS. Vi oprettede brugeren og adgangskoden til det i trinnet "Deploying AWS PostgreSQL". Som url til databaseserveren skal du bruge Endpoint, som vises i afsnittet Forbindelse og sikkerhed:
For at forbinde Spark og Kafka korrekt, skal du køre jobbet gennem smark-submit ved hjælp af artefakten spark-streaming-kafka-0-8_2.11. Derudover vil vi også bruge en artefakt til at interagere med PostgreSQL-databasen, vi vil sende dem gennem --pakker.
For fleksibilitet af scriptet vil vi også tage navnet på beskedserveren og det emne, som vi ønsker at modtage data fra, som inputparametre.
Alt fungerede! Som du kan se på billedet nedenfor, mens applikationen kører, vises nye aggregeringsresultater hvert 2. sekund, fordi vi satte bundlingintervallet til 2 sekunder, da vi oprettede StreamingContext-objektet:
Dernæst laver vi en simpel forespørgsel til databasen for at tjekke for poster i tabellen transaktionsflow:
Konklusion
I denne artikel blev et eksempel på streaming af informationsbehandling ved hjælp af Spark Streaming i forbindelse med Apache Kafka og PostgreSQL overvejet. Med væksten i mængder af data fra forskellige kilder er det svært at overvurdere den praktiske værdi af Spark Streaming til at skabe realtids- og streamingapplikationer.
Du kan finde den fulde kildekode i mit lager på GitHub.
Jeg er glad for at diskutere denne artikel, jeg ser frem til dine kommentarer, og jeg håber også på konstruktiv kritik fra alle berørte læsere.
Jeg ønsker dig succes!
Ps. Det var oprindeligt planlagt at bruge en lokal PostgreSQL-database, men på grund af min kærlighed til AWS besluttede jeg at flytte databasen til skyen. I den næste artikel om dette emne vil jeg vise dig, hvordan du implementerer hele systemet beskrevet ovenfor i AWS ved hjælp af AWS Kinesis og AWS EMR. Følg nyhederne!