Apache Kafka og streaming databehandling med Spark Streaming

Hei, Habr! I dag skal vi bygge et system som vil behandle Apache Kafka-meldingsstrømmer ved hjelp av Spark Streaming og skrive behandlingsresultatene til AWS RDS-skydatabasen.

La oss forestille oss at en bestemt kredittinstitusjon setter oss i oppgave å behandle innkommende transaksjoner "i farten" på tvers av alle sine filialer. Dette kan gjøres med det formål å raskt beregne en åpen valutaposisjon for statskassen, rammer eller økonomiske resultater for transaksjoner osv.

Hvordan implementere denne saken uten bruk av magi og magiske trollformler - les under kuttet! Gå!

Apache Kafka og streaming databehandling med Spark Streaming
(Bildekilde)

Innledning

Å behandle en stor mengde data i sanntid gir selvsagt gode muligheter for bruk i moderne systemer. En av de mest populære kombinasjonene for dette er tandem av Apache Kafka og Spark Streaming, der Kafka lager en strøm av innkommende meldingspakker, og Spark Streaming behandler disse pakkene ved et gitt tidsintervall.

For å øke feiltoleransen til applikasjonen vil vi bruke sjekkpunkter. Med denne mekanismen, når Spark Streaming-motoren trenger å gjenopprette tapte data, trenger den bare å gå tilbake til det siste sjekkpunktet og gjenoppta beregningene derfra.

Arkitektur av det utviklede systemet

Apache Kafka og streaming databehandling med Spark Streaming

Komponenter som brukes:

  • Apache Kafka er et distribuert publiser-abonner meldingssystem. Egnet for både offline og online meldingsforbruk. For å forhindre tap av data lagres Kafka-meldinger på disk og replikeres i klyngen. Kafka-systemet er bygget på toppen av ZooKeeper-synkroniseringstjenesten;
  • Apache Spark Streaming - Spark-komponent for behandling av strømmedata. Spark Streaming-modulen er bygget ved hjelp av en mikrobatch-arkitektur, hvor datastrømmen tolkes som en kontinuerlig sekvens av små datapakker. Spark Streaming tar data fra forskjellige kilder og kombinerer dem til små pakker. Nye pakker opprettes med jevne mellomrom. I begynnelsen av hvert tidsintervall opprettes en ny pakke, og alle data som mottas i løpet av det intervallet er inkludert i pakken. På slutten av intervallet stopper pakkeveksten. Størrelsen på intervallet bestemmes av en parameter kalt batchintervall;
  • Apache Spark SQL - kombinerer relasjonsbehandling med Spark funksjonell programmering. Strukturerte data betyr data som har et skjema, det vil si et enkelt sett med felt for alle poster. Spark SQL støtter input fra en rekke strukturerte datakilder, og takket være tilgjengeligheten av skjemainformasjon kan den effektivt hente bare de nødvendige postfeltene, og gir også DataFrame APIer;
  • AWS RDS er en relativt rimelig skybasert relasjonsdatabase, webtjeneste som forenkler oppsett, drift og skalering, og administreres direkte av Amazon.

Installere og kjøre Kafka-serveren

Før du bruker Kafka direkte, må du sørge for at du har Java, fordi... JVM brukes til arbeid:

sudo apt-get update 
sudo apt-get install default-jre
java -version

La oss opprette en ny bruker for å jobbe med Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Last deretter ned distribusjonen fra det offisielle Apache Kafka-nettstedet:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Pakk ut det nedlastede arkivet:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Det neste trinnet er valgfritt. Faktum er at standardinnstillingene ikke lar deg bruke alle funksjonene til Apache Kafka fullt ut. Slett for eksempel et emne, kategori, gruppe som meldinger kan publiseres til. For å endre dette, la oss redigere konfigurasjonsfilen:

vim ~/kafka/config/server.properties

Legg til følgende på slutten av filen:

delete.topic.enable = true

Før du starter Kafka-serveren, må du starte ZooKeeper-serveren; vi vil bruke hjelpeskriptet som følger med Kafka-distribusjonen:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Etter at ZooKeeper har startet vellykket, start Kafka-serveren i en separat terminal:

bin/kafka-server-start.sh config/server.properties

La oss lage et nytt emne kalt Transaksjon:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

La oss sørge for at et emne med det nødvendige antallet partisjoner og replikering er opprettet:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka og streaming databehandling med Spark Streaming

La oss gå glipp av øyeblikkene med å teste produsenten og forbrukeren for det nyopprettede emnet. Flere detaljer om hvordan du kan teste sending og mottak av meldinger er skrevet i den offisielle dokumentasjonen - Send noen meldinger. Vel, vi går videre til å skrive en produsent i Python ved å bruke KafkaProducer API.

Produsent skriver

Produsenten vil generere tilfeldige data - 100 meldinger hvert sekund. Med tilfeldige data mener vi en ordbok som består av tre felt:

  • Branch – navnet på kredittinstitusjonens salgssted;
  • valuta — transaksjonsvaluta;
  • Beløp - transaksjons beløp. Beløpet vil være et positivt tall hvis det er et kjøp av valuta fra banken, og et negativt tall hvis det er et salg.

Koden for produsenten ser slik ut:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Deretter, ved å bruke sendemetoden, sender vi en melding til serveren, til emnet vi trenger, i JSON-format:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Når vi kjører skriptet, mottar vi følgende meldinger i terminalen:

Apache Kafka og streaming databehandling med Spark Streaming

Det betyr at alt fungerer som vi ønsket – produsenten genererer og sender meldinger til det temaet vi trenger.
Neste trinn er å installere Spark og behandle denne meldingsstrømmen.

Installerer Apache Spark

Apache Spark er en universell og høyytelses cluster computing-plattform.

Spark yter bedre enn populære implementeringer av MapReduce-modellen samtidig som den støtter et bredere spekter av beregningstyper, inkludert interaktive spørringer og strømbehandling. Hastighet spiller en viktig rolle ved behandling av store datamengder, siden det er hastighet som lar deg jobbe interaktivt uten å bruke minutter eller timer på å vente. En av Sparks største styrker som gjør den så rask er evnen til å utføre minneberegninger.

Dette rammeverket er skrevet i Scala, så du må installere det først:

sudo apt-get install scala

Last ned Spark-distribusjonen fra den offisielle nettsiden:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Pakk ut arkivet:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Legg til banen til Spark i bash-filen:

vim ~/.bashrc

Legg til følgende linjer gjennom editoren:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Kjør kommandoen nedenfor etter å ha gjort endringer i bashrc:

source ~/.bashrc

Distribuerer AWS PostgreSQL

Alt som gjenstår er å distribuere databasen der vi skal laste opp den behandlede informasjonen fra strømmene. Til dette vil vi bruke AWS RDS-tjenesten.

Gå til AWS-konsollen -> AWS RDS -> Databaser -> Opprett database:
Apache Kafka og streaming databehandling med Spark Streaming

Velg PostgreSQL og klikk Neste:
Apache Kafka og streaming databehandling med Spark Streaming

Fordi Dette eksemplet er kun for pedagogiske formål; vi vil bruke en gratis server "minst" (gratis nivå):
Apache Kafka og streaming databehandling med Spark Streaming

Deretter setter vi en hake i Free Tier-blokken, og etter det vil vi automatisk bli tilbudt en forekomst av t2.micro-klassen - selv om den er svak, er den gratis og ganske egnet for oppgaven vår:
Apache Kafka og streaming databehandling med Spark Streaming

Deretter kommer svært viktige ting: navnet på databaseforekomsten, navnet på hovedbrukeren og passordet hans. La oss gi forekomsten navn: myHabrTest, hovedbruker: habr, passord: habr12345 og klikk på Neste-knappen:
Apache Kafka og streaming databehandling med Spark Streaming

På neste side er det parametere som er ansvarlige for tilgjengeligheten til databaseserveren vår fra utsiden (Offentlig tilgjengelighet) og porttilgjengelighet:

Apache Kafka og streaming databehandling med Spark Streaming

La oss lage en ny innstilling for VPC-sikkerhetsgruppen, som vil tillate ekstern tilgang til databaseserveren vår via port 5432 (PostgreSQL).
La oss gå til AWS-konsollen i et eget nettleservindu til VPC Dashboard -> Sikkerhetsgrupper -> Opprett sikkerhetsgruppe:
Apache Kafka og streaming databehandling med Spark Streaming

Vi setter navnet på sikkerhetsgruppen - PostgreSQL, en beskrivelse, indikerer hvilken VPC denne gruppen skal assosieres med og klikker på Opprett-knappen:
Apache Kafka og streaming databehandling med Spark Streaming

Fyll inn innkommende regler for port 5432 for den nyopprettede gruppen, som vist på bildet nedenfor. Du kan ikke spesifisere porten manuelt, men velg PostgreSQL fra rullegardinlisten Type.

Strengt tatt betyr verdien ::/0 tilgjengeligheten av innkommende trafikk til serveren fra hele verden, noe som kanonisk ikke er helt sant, men for å analysere eksemplet, la oss tillate oss å bruke denne tilnærmingen:
Apache Kafka og streaming databehandling med Spark Streaming

Vi går tilbake til nettlesersiden, hvor vi har "Konfigurer avanserte innstillinger" åpen og velger i delen VPC-sikkerhetsgrupper -> Velg eksisterende VPC-sikkerhetsgrupper -> PostgreSQL:
Apache Kafka og streaming databehandling med Spark Streaming

Deretter, i Databasealternativene -> Databasenavn -> angi navnet - habrDB.

Vi kan la de gjenværende parametrene, med unntak av deaktivering av sikkerhetskopiering (reserveoppbevaringsperiode - 0 dager), overvåking og ytelsesinnsikt, stå som standard. Klikk på knappen Lag database:
Apache Kafka og streaming databehandling med Spark Streaming

Trådbehandler

Den siste fasen vil være utviklingen av en Spark-jobb, som vil behandle nye data som kommer fra Kafka annethvert sekund og legge resultatet inn i databasen.

Som nevnt ovenfor er sjekkpunkter en kjernemekanisme i SparkStreaming som må konfigureres for å sikre feiltoleranse. Vi vil bruke sjekkpunkter, og hvis prosedyren mislykkes, trenger Spark Streaming-modulen bare å gå tilbake til det siste sjekkpunktet og gjenoppta beregningene fra det for å gjenopprette tapte data.

Kontrollpunkt kan aktiveres ved å sette en katalog på et feiltolerant, pålitelig filsystem (som HDFS, S3, etc.) der kontrollpunktinformasjonen vil bli lagret. Dette gjøres ved å for eksempel:

streamingContext.checkpoint(checkpointDirectory)

I vårt eksempel vil vi bruke følgende tilnærming, nemlig hvis checkpointDirectory eksisterer, vil konteksten bli gjenskapt fra kontrollpunktdataene. Hvis katalogen ikke eksisterer (dvs. kjøres for første gang), kalles functionToCreateContext for å opprette en ny kontekst og konfigurere DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Vi oppretter et DirectStream-objekt for å koble til "transaksjon"-emnet ved å bruke createDirectStream-metoden til KafkaUtils-biblioteket:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing av innkommende data i JSON-format:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Ved å bruke Spark SQL gjør vi en enkel gruppering og viser resultatet i konsollen:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Hente søketeksten og kjøre den gjennom Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Og så lagrer vi de resulterende aggregerte dataene i en tabell i AWS RDS. For å lagre aggregeringsresultatene til en databasetabell, bruker vi skrivemetoden til DataFrame-objektet:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Noen få ord om å sette opp en tilkobling til AWS RDS. Vi opprettet brukeren og passordet for det i trinnet "Deploying AWS PostgreSQL". Du bør bruke Endpoint som databaseserver-url, som vises i delen Tilkobling og sikkerhet:

Apache Kafka og streaming databehandling med Spark Streaming

For å koble Spark og Kafka riktig, bør du kjøre jobben via smark-submit ved å bruke artefakten spark-streaming-kafka-0-8_2.11. I tillegg vil vi også bruke en artefakt for å samhandle med PostgreSQL-databasen; vi vil overføre dem via --packages.

For fleksibiliteten til skriptet vil vi også inkludere som inputparametere navnet på meldingsserveren og emnet som vi ønsker å motta data fra.

Så det er på tide å starte og sjekke systemets funksjonalitet:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Alt ordnet seg! Som du kan se på bildet nedenfor, mens applikasjonen kjører, sendes nye aggregeringsresultater ut hvert 2. sekund, fordi vi satte batchintervallet til 2 sekunder da vi opprettet StreamingContext-objektet:

Apache Kafka og streaming databehandling med Spark Streaming

Deretter gjør vi en enkel spørring til databasen for å sjekke tilstedeværelsen av poster i tabellen transaksjonsflyt:

Apache Kafka og streaming databehandling med Spark Streaming

Konklusjon

Denne artikkelen så på et eksempel på strømbehandling av informasjon ved bruk av Spark Streaming i forbindelse med Apache Kafka og PostgreSQL. Med veksten av data fra ulike kilder, er det vanskelig å overvurdere den praktiske verdien av Spark Streaming for å lage streaming og sanntidsapplikasjoner.

Du kan finne hele kildekoden i mitt depot på GitHub.

Jeg er glad for å diskutere denne artikkelen, jeg ser frem til dine kommentarer, og jeg håper også på konstruktiv kritikk fra alle omsorgsfulle lesere.

Jeg ønsker deg suksess!

Ps. I utgangspunktet var det planlagt å bruke en lokal PostgreSQL-database, men på grunn av min kjærlighet til AWS, bestemte jeg meg for å flytte databasen til skyen. I den neste artikkelen om dette emnet vil jeg vise hvordan du implementerer hele systemet beskrevet ovenfor i AWS ved hjelp av AWS Kinesis og AWS EMR. Følg med på nyhetene!

Kilde: www.habr.com

Legg til en kommentar