Apache Kafka og streaming databehandling med Spark Streaming
Hei, Habr! I dag skal vi bygge et system som vil behandle Apache Kafka-meldingsstrømmer ved hjelp av Spark Streaming og skrive behandlingsresultatene til AWS RDS-skydatabasen.
La oss forestille oss at en bestemt kredittinstitusjon setter oss i oppgave å behandle innkommende transaksjoner "i farten" på tvers av alle sine filialer. Dette kan gjøres med det formål å raskt beregne en åpen valutaposisjon for statskassen, rammer eller økonomiske resultater for transaksjoner osv.
Hvordan implementere denne saken uten bruk av magi og magiske trollformler - les under kuttet! Gå!
Å behandle en stor mengde data i sanntid gir selvsagt gode muligheter for bruk i moderne systemer. En av de mest populære kombinasjonene for dette er tandem av Apache Kafka og Spark Streaming, der Kafka lager en strøm av innkommende meldingspakker, og Spark Streaming behandler disse pakkene ved et gitt tidsintervall.
For å øke feiltoleransen til applikasjonen vil vi bruke sjekkpunkter. Med denne mekanismen, når Spark Streaming-motoren trenger å gjenopprette tapte data, trenger den bare å gå tilbake til det siste sjekkpunktet og gjenoppta beregningene derfra.
Arkitektur av det utviklede systemet
Komponenter som brukes:
Apache Kafka er et distribuert publiser-abonner meldingssystem. Egnet for både offline og online meldingsforbruk. For å forhindre tap av data lagres Kafka-meldinger på disk og replikeres i klyngen. Kafka-systemet er bygget på toppen av ZooKeeper-synkroniseringstjenesten;
Apache Spark Streaming - Spark-komponent for behandling av strømmedata. Spark Streaming-modulen er bygget ved hjelp av en mikrobatch-arkitektur, hvor datastrømmen tolkes som en kontinuerlig sekvens av små datapakker. Spark Streaming tar data fra forskjellige kilder og kombinerer dem til små pakker. Nye pakker opprettes med jevne mellomrom. I begynnelsen av hvert tidsintervall opprettes en ny pakke, og alle data som mottas i løpet av det intervallet er inkludert i pakken. På slutten av intervallet stopper pakkeveksten. Størrelsen på intervallet bestemmes av en parameter kalt batchintervall;
Apache Spark SQL - kombinerer relasjonsbehandling med Spark funksjonell programmering. Strukturerte data betyr data som har et skjema, det vil si et enkelt sett med felt for alle poster. Spark SQL støtter input fra en rekke strukturerte datakilder, og takket være tilgjengeligheten av skjemainformasjon kan den effektivt hente bare de nødvendige postfeltene, og gir også DataFrame APIer;
AWS RDS er en relativt rimelig skybasert relasjonsdatabase, webtjeneste som forenkler oppsett, drift og skalering, og administreres direkte av Amazon.
Installere og kjøre Kafka-serveren
Før du bruker Kafka direkte, må du sørge for at du har Java, fordi... JVM brukes til arbeid:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Det neste trinnet er valgfritt. Faktum er at standardinnstillingene ikke lar deg bruke alle funksjonene til Apache Kafka fullt ut. Slett for eksempel et emne, kategori, gruppe som meldinger kan publiseres til. For å endre dette, la oss redigere konfigurasjonsfilen:
vim ~/kafka/config/server.properties
Legg til følgende på slutten av filen:
delete.topic.enable = true
Før du starter Kafka-serveren, må du starte ZooKeeper-serveren; vi vil bruke hjelpeskriptet som følger med Kafka-distribusjonen:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Etter at ZooKeeper har startet vellykket, start Kafka-serveren i en separat terminal:
La oss gå glipp av øyeblikkene med å teste produsenten og forbrukeren for det nyopprettede emnet. Flere detaljer om hvordan du kan teste sending og mottak av meldinger er skrevet i den offisielle dokumentasjonen - Send noen meldinger. Vel, vi går videre til å skrive en produsent i Python ved å bruke KafkaProducer API.
Produsent skriver
Produsenten vil generere tilfeldige data - 100 meldinger hvert sekund. Med tilfeldige data mener vi en ordbok som består av tre felt:
Branch – navnet på kredittinstitusjonens salgssted;
valuta — transaksjonsvaluta;
Beløp - transaksjons beløp. Beløpet vil være et positivt tall hvis det er et kjøp av valuta fra banken, og et negativt tall hvis det er et salg.
Deretter, ved å bruke sendemetoden, sender vi en melding til serveren, til emnet vi trenger, i JSON-format:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Når vi kjører skriptet, mottar vi følgende meldinger i terminalen:
Det betyr at alt fungerer som vi ønsket – produsenten genererer og sender meldinger til det temaet vi trenger.
Neste trinn er å installere Spark og behandle denne meldingsstrømmen.
Installerer Apache Spark
Apache Spark er en universell og høyytelses cluster computing-plattform.
Spark yter bedre enn populære implementeringer av MapReduce-modellen samtidig som den støtter et bredere spekter av beregningstyper, inkludert interaktive spørringer og strømbehandling. Hastighet spiller en viktig rolle ved behandling av store datamengder, siden det er hastighet som lar deg jobbe interaktivt uten å bruke minutter eller timer på å vente. En av Sparks største styrker som gjør den så rask er evnen til å utføre minneberegninger.
Dette rammeverket er skrevet i Scala, så du må installere det først:
sudo apt-get install scala
Last ned Spark-distribusjonen fra den offisielle nettsiden:
Kjør kommandoen nedenfor etter å ha gjort endringer i bashrc:
source ~/.bashrc
Distribuerer AWS PostgreSQL
Alt som gjenstår er å distribuere databasen der vi skal laste opp den behandlede informasjonen fra strømmene. Til dette vil vi bruke AWS RDS-tjenesten.
Gå til AWS-konsollen -> AWS RDS -> Databaser -> Opprett database:
Velg PostgreSQL og klikk Neste:
Fordi Dette eksemplet er kun for pedagogiske formål; vi vil bruke en gratis server "minst" (gratis nivå):
Deretter setter vi en hake i Free Tier-blokken, og etter det vil vi automatisk bli tilbudt en forekomst av t2.micro-klassen - selv om den er svak, er den gratis og ganske egnet for oppgaven vår:
Deretter kommer svært viktige ting: navnet på databaseforekomsten, navnet på hovedbrukeren og passordet hans. La oss gi forekomsten navn: myHabrTest, hovedbruker: habr, passord: habr12345 og klikk på Neste-knappen:
På neste side er det parametere som er ansvarlige for tilgjengeligheten til databaseserveren vår fra utsiden (Offentlig tilgjengelighet) og porttilgjengelighet:
La oss lage en ny innstilling for VPC-sikkerhetsgruppen, som vil tillate ekstern tilgang til databaseserveren vår via port 5432 (PostgreSQL).
La oss gå til AWS-konsollen i et eget nettleservindu til VPC Dashboard -> Sikkerhetsgrupper -> Opprett sikkerhetsgruppe:
Vi setter navnet på sikkerhetsgruppen - PostgreSQL, en beskrivelse, indikerer hvilken VPC denne gruppen skal assosieres med og klikker på Opprett-knappen:
Fyll inn innkommende regler for port 5432 for den nyopprettede gruppen, som vist på bildet nedenfor. Du kan ikke spesifisere porten manuelt, men velg PostgreSQL fra rullegardinlisten Type.
Strengt tatt betyr verdien ::/0 tilgjengeligheten av innkommende trafikk til serveren fra hele verden, noe som kanonisk ikke er helt sant, men for å analysere eksemplet, la oss tillate oss å bruke denne tilnærmingen:
Vi går tilbake til nettlesersiden, hvor vi har "Konfigurer avanserte innstillinger" åpen og velger i delen VPC-sikkerhetsgrupper -> Velg eksisterende VPC-sikkerhetsgrupper -> PostgreSQL:
Deretter, i Databasealternativene -> Databasenavn -> angi navnet - habrDB.
Vi kan la de gjenværende parametrene, med unntak av deaktivering av sikkerhetskopiering (reserveoppbevaringsperiode - 0 dager), overvåking og ytelsesinnsikt, stå som standard. Klikk på knappen Lag database:
Trådbehandler
Den siste fasen vil være utviklingen av en Spark-jobb, som vil behandle nye data som kommer fra Kafka annethvert sekund og legge resultatet inn i databasen.
Som nevnt ovenfor er sjekkpunkter en kjernemekanisme i SparkStreaming som må konfigureres for å sikre feiltoleranse. Vi vil bruke sjekkpunkter, og hvis prosedyren mislykkes, trenger Spark Streaming-modulen bare å gå tilbake til det siste sjekkpunktet og gjenoppta beregningene fra det for å gjenopprette tapte data.
Kontrollpunkt kan aktiveres ved å sette en katalog på et feiltolerant, pålitelig filsystem (som HDFS, S3, etc.) der kontrollpunktinformasjonen vil bli lagret. Dette gjøres ved å for eksempel:
streamingContext.checkpoint(checkpointDirectory)
I vårt eksempel vil vi bruke følgende tilnærming, nemlig hvis checkpointDirectory eksisterer, vil konteksten bli gjenskapt fra kontrollpunktdataene. Hvis katalogen ikke eksisterer (dvs. kjøres for første gang), kalles functionToCreateContext for å opprette en ny kontekst og konfigurere DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Vi oppretter et DirectStream-objekt for å koble til "transaksjon"-emnet ved å bruke createDirectStream-metoden til KafkaUtils-biblioteket:
Ved å bruke Spark SQL gjør vi en enkel gruppering og viser resultatet i konsollen:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Og så lagrer vi de resulterende aggregerte dataene i en tabell i AWS RDS. For å lagre aggregeringsresultatene til en databasetabell, bruker vi skrivemetoden til DataFrame-objektet:
Noen få ord om å sette opp en tilkobling til AWS RDS. Vi opprettet brukeren og passordet for det i trinnet "Deploying AWS PostgreSQL". Du bør bruke Endpoint som databaseserver-url, som vises i delen Tilkobling og sikkerhet:
For å koble Spark og Kafka riktig, bør du kjøre jobben via smark-submit ved å bruke artefakten spark-streaming-kafka-0-8_2.11. I tillegg vil vi også bruke en artefakt for å samhandle med PostgreSQL-databasen; vi vil overføre dem via --packages.
For fleksibiliteten til skriptet vil vi også inkludere som inputparametere navnet på meldingsserveren og emnet som vi ønsker å motta data fra.
Så det er på tide å starte og sjekke systemets funksjonalitet:
Alt ordnet seg! Som du kan se på bildet nedenfor, mens applikasjonen kjører, sendes nye aggregeringsresultater ut hvert 2. sekund, fordi vi satte batchintervallet til 2 sekunder da vi opprettet StreamingContext-objektet:
Deretter gjør vi en enkel spørring til databasen for å sjekke tilstedeværelsen av poster i tabellen transaksjonsflyt:
Konklusjon
Denne artikkelen så på et eksempel på strømbehandling av informasjon ved bruk av Spark Streaming i forbindelse med Apache Kafka og PostgreSQL. Med veksten av data fra ulike kilder, er det vanskelig å overvurdere den praktiske verdien av Spark Streaming for å lage streaming og sanntidsapplikasjoner.
Du kan finne hele kildekoden i mitt depot på GitHub.
Jeg er glad for å diskutere denne artikkelen, jeg ser frem til dine kommentarer, og jeg håper også på konstruktiv kritikk fra alle omsorgsfulle lesere.
Jeg ønsker deg suksess!
Ps. I utgangspunktet var det planlagt å bruke en lokal PostgreSQL-database, men på grunn av min kjærlighet til AWS, bestemte jeg meg for å flytte databasen til skyen. I den neste artikkelen om dette emnet vil jeg vise hvordan du implementerer hele systemet beskrevet ovenfor i AWS ved hjelp av AWS Kinesis og AWS EMR. Følg med på nyhetene!