Haai Habr! Ons sal vandag 'n stelsel bou wat Apache Kafka-boodskapstrome sal verwerk deur Spark Streaming te gebruik en die verwerkingsresultaat na die AWS RDS-wolkdatabasis sal skryf.
Stel jou voor dat 'n sekere kredietinstelling ons die taak stel om inkomende transaksies "on-die-vlieg" vir al sy takke te verwerk. Dit kan gedoen word met die doel om vinnig die oop geldeenheidposisie vir die tesourie, limiete of finansiële resultaat op transaksies, ens.
Hoe om hierdie geval te implementeer sonder die gebruik van towerkrag en towerspreuke - lees onder die snit! Gaan!
Natuurlik bied intydse verwerking van 'n groot hoeveelheid data genoeg geleenthede vir gebruik in moderne stelsels. Een van die gewildste kombinasies hiervoor is die tandem van Apache Kafka en Spark Streaming, waar Kafka 'n stroom van inkomende boodskappakkies skep, en Spark Streaming hierdie pakkies op 'n bepaalde tydsinterval verwerk.
Om die fouttoleransie van die toepassing te verbeter, sal ons kontrolepunte - kontrolepunte gebruik. Met hierdie meganisme, wanneer die Spark Streaming-module verlore data moet herstel, hoef dit net na die laaste kontrolepunt terug te keer en van daar af berekeninge te hervat.
Die argitektuur van die ontwikkelde stelsel
Gebruikte komponente:
Apache Kafka is 'n verspreide publiseer-en-teken-boodskapstelsel. Geskik vir beide vanlyn en aanlyn boodskapverbruik. Om dataverlies te voorkom, word Kafka-boodskappe op skyf gestoor en binne die groep gerepliseer. Die Kafka-stelsel is bo-op die ZooKeeper-sinchronisasiediens gebou;
Apache Spark Streaming - 'n Spark-komponent vir die verwerking van stroomdata. Die Spark Streaming-module word gebou deur 'n mikro-batch-argitektuur te gebruik, wanneer 'n datastroom geïnterpreteer word as 'n aaneenlopende reeks klein datapakkies. Spark Streaming neem data uit verskillende bronne en kombineer dit in klein bondels. Nuwe pakkette word met gereelde tussenposes geskep. Aan die begin van elke tydinterval word 'n nuwe pakkie geskep, en enige data wat gedurende daardie interval ontvang word, word by die pakkie ingesluit. Aan die einde van die interval stop pakkiegroei. Die grootte van die interval word bepaal deur 'n parameter wat die bondelinterval genoem word;
Apache Spark SQL - Kombineer relasionele verwerking met Spark funksionele programmering. Gestruktureerde data verwys na data wat 'n skema het, dit wil sê 'n enkele stel velde vir alle rekords. Spark SQL ondersteun insette van 'n verskeidenheid gestruktureerde databronne en, as gevolg van die teenwoordigheid van skema-inligting, kan dit doeltreffend net die vereiste velde van rekords ophaal, en verskaf ook DataFrame API's;
AWS RDS is 'n relatief goedkoop wolkgebaseerde relasionele databasis, 'n webdiens wat opstelling, werking en skaal vereenvoudig, direk deur Amazon geadministreer.
Installeer en bestuur die Kafka-bediener
Voordat jy Kafka direk gebruik, moet jy seker maak dat jy Java het, want JVM word gebruik vir werk:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Die volgende stap is opsioneel. Die feit is dat die verstekinstellings jou nie toelaat om al die kenmerke van Apache Kafka ten volle te gebruik nie. Vee byvoorbeeld 'n onderwerp, kategorie, groep uit waarop boodskappe gepubliseer kan word. Om dit te verander, kom ons wysig die konfigurasielêer:
vim ~/kafka/config/server.properties
Voeg die volgende aan die einde van die lêer:
delete.topic.enable = true
Voordat jy die Kafka-bediener begin, moet jy die ZooKeeper-bediener begin, ons sal die helper script gebruik wat saam met die Kafka verspreiding kom:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Nadat ZooKeeper suksesvol begin het, begin ons die Kafka-bediener in 'n aparte terminaal:
Kom ons mis die oomblikke van toetsing van die produsent en verbruiker vir die nuutgeskepte onderwerp. Meer besonderhede oor hoe jy die stuur en ontvang van boodskappe kan toets, is in die amptelike dokumentasie geskryf - Stuur 'n paar boodskappe. Wel, ons gaan voort om 'n vervaardiger in Python te skryf met behulp van die KafkaProducer API.
Vervaardiger skryf
Die vervaardiger sal ewekansige data genereer - 100 boodskappe elke sekonde. Met ewekansige data bedoel ons 'n woordeboek wat uit drie velde bestaan:
Tak — naam van die verkoopspunt van die kredietinstelling;
Geld — transaksiegeldeenheid;
bedrag - Transaksie Bedrag. Die bedrag sal positief wees as dit 'n aankoop van valuta deur die Bank is, en negatief as dit 'n verkoop is.
Volgende, met behulp van die stuurmetode, stuur ons 'n boodskap na die bediener, na die onderwerp wat ons benodig, in JSON-formaat:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Wanneer die skrip uitgevoer word, kry ons die volgende boodskappe in die terminale:
Dit beteken alles werk soos ons wou – die vervaardiger genereer en stuur boodskappe na die onderwerp wat ons nodig het.
Die volgende stap is om Spark te installeer en hierdie boodskapvloei te verwerk.
Installeer Apache Spark
Apache Spark is 'n veelsydige en hoë werkverrigting groeprekenaarplatform.
Spark presteer beter as gewilde implementerings van die MapReduce-model in terme van werkverrigting, terwyl dit ondersteuning bied vir 'n wyer reeks berekeningtipes, insluitend interaktiewe navrae en stroming. Spoed speel 'n belangrike rol wanneer groot hoeveelhede data verwerk word, aangesien dit spoed is wat jou toelaat om interaktief te werk sonder om minute of ure te wag. Een van Spark se grootste sterkpunte om hierdie spoed te lewer, is sy vermoë om in-geheue-berekeninge uit te voer.
Hierdie raamwerk is in Scala geskryf, so jy moet dit eers installeer:
sudo apt-get install scala
Laai die Spark-verspreiding van die amptelike webwerf af:
Voer die opdrag hieronder uit nadat jy veranderinge aan bashrc gemaak het:
source ~/.bashrc
Ontplooi AWS PostgreSQL
Dit bly om die databasis te ontplooi, waar ons die verwerkte inligting van die strome sal invul. Om dit te doen, sal ons die AWS RDS-diens gebruik.
Gaan na die AWS-konsole -> AWS RDS -> Databasisse -> Skep databasis:
Kies PostgreSQL en klik op die volgende knoppie:
Omdat hierdie voorbeeld word uitsluitlik vir opvoedkundige doeleindes ontleed, ons sal 'n gratis bediener gebruik "ten minste" (Free Tier):
Merk dan die blokkie Free Tier, en daarna sal ons outomaties 'n instansie van die t2.micro-klas aangebied word - hoewel swak, maar gratis en redelik geskik vir ons taak:
Baie belangrike dinge volg: die naam van die DB-instansie, die naam van die meestergebruiker en sy wagwoord. Kom ons noem die instansie: myHabrTest, meestergebruiker: habr, wagwoord: habr12345 en klik op die volgende knoppie:
Die volgende bladsy bevat die parameters wat verantwoordelik is vir die toeganklikheid van ons databasisbediener van buite (Publieke toeganklikheid) en die beskikbaarheid van poorte:
Kom ons skep 'n nuwe instelling vir die VPC-sekuriteitsgroep, wat eksterne toegang tot ons databasisbediener deur poort 5432 (PostgreSQL) sal toelaat.
Kom ons gaan in 'n aparte blaaiervenster na die AWS-konsole in die VPC Dashboard -> Sekuriteitsgroepe -> Skep sekuriteitsgroep-afdeling:
Ons stel die naam vir die sekuriteitsgroep - PostgreSQL, 'n beskrywing, spesifiseer met watter VPC hierdie groep geassosieer moet word en klik op die Skep-knoppie:
Ons vul in vir die nuutgeskepte groep Inkomende reëls vir poort 5432, soos in die prentjie hieronder getoon. Jy kan nie die poort met die hand spesifiseer nie, maar kies PostgreSQL uit die Tipe-aftreklys.
Streng gesproke beteken die waarde ::/0 die beskikbaarheid van inkomende verkeer vir die bediener van regoor die wêreld, wat nie heeltemal kanonies waar is nie, maar om die voorbeeld te ontleed, kom ons gebruik hierdie benadering:
Ons keer terug na die blaaierbladsy, waar ons "Konfigureer gevorderde instellings" oop het en kies VPC-sekuriteitsgroepe -> Kies bestaande VPC-sekuriteitsgroepe -> PostgreSQL in die afdeling:
Volgende, in die afdeling Databasis opsies -> Databasis naam -> stel die naam - habrDB.
Ons kan die res van die parameters verlaat, met die uitsondering van die deaktivering van rugsteun (rugsteunbehoudtydperk - 0 dae), monitering en prestasie-insigte, by verstek. Klik op die knoppie Skep databasis:
Stroom hanteerder
Die laaste fase sal die ontwikkeling van 'n Spark-werk wees, wat elke twee sekondes nuwe data van Kafka sal verwerk en die resultaat in die databasis sal invoer.
Soos hierbo genoem, is kontrolepunte die hoofmeganisme in SparkStreaming wat gekonfigureer moet word om fouttoleransie te verskaf. Ons sal kontrolepunte gebruik en, in die geval van 'n prosedure mislukking, sal die Spark Streaming-module net hoef terug te keer na die laaste kontrolepunt en berekeninge daaruit te hervat om die verlore data te herwin.
'n Kontrolepunt kan geaktiveer word deur 'n gids op 'n fouttolerante, betroubare lêerstelsel (bv. HDFS, S3, ens.) te stel waar die kontrolepuntinligting gestoor sal word. Dit word gedoen met byvoorbeeld:
streamingContext.checkpoint(checkpointDirectory)
In ons voorbeeld sal ons die volgende benadering gebruik, naamlik, as die kontrolepuntDirectory bestaan, sal die konteks herskep word vanaf die kontrolepuntdata. As die gids nie bestaan nie (d.w.s. dit word vir die eerste keer uitgevoer), dan word die functionToCreateContext-funksie opgeroep om 'n nuwe konteks te skep en DStreams op te stel:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Ons skep 'n DirectStream-objek om aan die "transaksie"-onderwerp te koppel deur die createDirectStream-metode van die KafkaUtils-biblioteek te gebruik:
Deur Spark SQL te gebruik, doen ons 'n eenvoudige groepering en voer die resultaat uit na die konsole:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Kry die navraagliggaam en voer dit deur Spark SQL:
En dan stoor ons die ontvangde saamgevoegde data in 'n tabel in AWS RDS. Om die resultate van die samevoeging na 'n databasistabel te stoor, sal ons die skryfmetode van die DataFrame-objek gebruik:
'n Paar woorde oor die opstel van 'n verbinding met AWS RDS. Ons het die gebruiker en wagwoord daarvoor geskep by die “Deploying AWS PostgreSQL”-stap. As die url van die databasisbediener moet jy die Eindpunt gebruik, wat in die Verbindings- en sekuriteitsafdeling vertoon word:
Om Spark en Kafka korrek te verbind, moet jy die taak deur smark-submit met die artefak laat loop vonk-stroom-kafka-0-8_2.11. Daarbenewens sal ons ook 'n artefak gebruik vir interaksie met die PostgreSQL-databasis, ons sal dit deur --pakkette stuur.
Vir buigsaamheid van die skrif, sal ons ook die naam van die boodskapbediener en die onderwerp waaruit ons data wil ontvang as invoerparameters uithaal.
Dit is dus tyd om die stelsel uit te voer en te toets:
Alles het uitgewerk! Soos u in die prentjie hieronder kan sien, terwyl die toepassing loop, word nuwe samevoegingsresultate elke 2 sekondes vertoon, want ons het die bondelinterval op 2 sekondes gestel toe ons die StreamingContext-objek geskep het:
Vervolgens maak ons 'n eenvoudige navraag na die databasis om te kyk vir rekords in die tabel transaksie_vloei:
Gevolgtrekking
In hierdie artikel is 'n voorbeeld van stroominligtingverwerking met behulp van Spark Streaming in samewerking met Apache Kafka en PostgreSQL oorweeg. Met die groei in volumes data uit verskeie bronne, is dit moeilik om die praktiese waarde van Spark Streaming vir die skep van intydse en stroomtoepassings te oorskat.
U kan die volledige bronkode in my bewaarplek vind by GitHub.
Ek is bly om hierdie artikel te bespreek, ek sien uit na u kommentaar, en ek hoop ook vir opbouende kritiek van alle betrokke lesers.
Ek wens jou sukses toe!
Ps. Dit was oorspronklik beplan om 'n plaaslike PostgreSQL-databasis te gebruik, maar gegewe my liefde vir AWS, het ek besluit om die databasis na die wolk te skuif. In die volgende artikel oor hierdie onderwerp, sal ek jou wys hoe om die hele stelsel wat hierbo beskryf word in AWS te implementeer deur gebruik te maak van AWS Kinesis en AWS EMR. Volg die nuus!