ProHoster > Blog > administratë > Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming
Apache Kafka dhe përpunimi i të dhënave të transmetimit me Spark Streaming
Përshëndetje, Habr! Sot do të ndërtojmë një sistem që do të përpunojë transmetimet e mesazheve Apache Kafka duke përdorur Spark Streaming dhe do t'i shkruajë rezultatet e përpunimit në bazën e të dhënave AWS RDS cloud.
Le të imagjinojmë se një institucion i caktuar krediti na vendos detyrën për të përpunuar transaksionet hyrëse “në fluturim” në të gjitha degët e tij. Kjo mund të bëhet për qëllimin e llogaritjes së menjëhershme të një pozicioni të hapur valutor për thesarin, kufijtë ose rezultatet financiare për transaksionet, etj.
Si ta zbatoni këtë rast pa përdorimin e magjive dhe magjive - lexoni nën prerje! Shkoni!
Natyrisht, përpunimi i një sasie të madhe të dhënash në kohë reale ofron mundësi të shumta për përdorim në sistemet moderne. Një nga kombinimet më të njohura për këtë është tandemi i Apache Kafka dhe Spark Streaming, ku Kafka krijon një rrjedhë të paketave të mesazheve hyrëse dhe Spark Streaming i përpunon këto pako në një interval të caktuar kohor.
Për të rritur tolerancën ndaj gabimeve të aplikacionit, ne do të përdorim pikat e kontrollit. Me këtë mekanizëm, kur motori Spark Streaming duhet të rikuperojë të dhënat e humbura, duhet vetëm të kthehet në pikën e fundit të kontrollit dhe të rifillojë llogaritjet prej andej.
Arkitektura e sistemit të zhvilluar
Komponentët e përdorur:
Apache Kafka është një sistem i shpërndarë mesazhesh publikim-subscribe. I përshtatshëm për konsumimin e mesazheve offline dhe online. Për të parandaluar humbjen e të dhënave, mesazhet e Kafkës ruhen në disk dhe riprodhohen brenda grupit. Sistemi Kafka është ndërtuar në krye të shërbimit të sinkronizimit ZooKeeper;
Apache Spark Streaming - Komponenti Spark për përpunimin e të dhënave të transmetimit. Moduli Spark Streaming është ndërtuar duke përdorur një arkitekturë mikro-batch, ku rrjedha e të dhënave interpretohet si një sekuencë e vazhdueshme e paketave të vogla të të dhënave. Spark Streaming merr të dhëna nga burime të ndryshme dhe i kombinon ato në paketa të vogla. Paketat e reja krijohen në intervale të rregullta. Në fillim të çdo intervali kohor, krijohet një paketë e re dhe çdo e dhënë e marrë gjatë atij intervali përfshihet në paketë. Në fund të intervalit, rritja e paketave ndalon. Madhësia e intervalit përcaktohet nga një parametër i quajtur intervali i grupit;
Apache Spark SQL - kombinon përpunimin relacional me programimin funksional Spark. Të dhëna të strukturuara nënkuptojnë të dhëna që kanë një skemë, domethënë një grup të vetëm fushash për të gjitha regjistrimet. Spark SQL mbështet të dhëna nga një shumëllojshmëri burimesh të strukturuara të të dhënave dhe, falë disponueshmërisë së informacionit të skemës, mund të marrë në mënyrë efikase vetëm fushat e kërkuara të regjistrimeve, dhe gjithashtu ofron DataFrame API;
AWS RDS është një bazë të dhënash relacionale relativisht e lirë e bazuar në cloud, shërbim ueb që thjeshton konfigurimin, funksionimin dhe shkallëzimin dhe administrohet drejtpërdrejt nga Amazon.
Instalimi dhe ekzekutimi i serverit Kafka
Përpara se të përdorni Kafka direkt, duhet të siguroheni që keni Java, sepse... JVM përdoret për punë:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Hapi tjetër është fakultativ. Fakti është se cilësimet e paracaktuara nuk ju lejojnë të përdorni plotësisht të gjitha tiparet e Apache Kafka. Për shembull, fshini një temë, kategori, grup në të cilin mund të publikohen mesazhet. Për ta ndryshuar këtë, le të modifikojmë skedarin e konfigurimit:
vim ~/kafka/config/server.properties
Shtoni sa vijon në fund të skedarit:
delete.topic.enable = true
Përpara se të nisni serverin Kafka, duhet të nisni serverin ZooKeeper; ne do të përdorim skriptin ndihmës që vjen me shpërndarjen Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Pasi ZooKeeper të ketë filluar me sukses, hapni serverin Kafka në një terminal të veçantë:
Le të humbasim momentet e testimit të prodhuesit dhe konsumatorit për temën e krijuar rishtazi. Më shumë detaje se si mund të testoni dërgimin dhe marrjen e mesazheve janë shkruar në dokumentacionin zyrtar - Dërgo disa mesazhe. Epo, ne vazhdojmë të shkruajmë një prodhues në Python duke përdorur API-në e KafkaProducer.
Shkrimi i prodhuesit
Prodhuesi do të gjenerojë të dhëna të rastësishme - 100 mesazhe çdo sekondë. Me të dhëna të rastësishme nënkuptojmë një fjalor të përbërë nga tre fusha:
Degë - emri i pikës së shitjes së institucionit të kreditit;
Monedhë — valuta e transaksionit;
Sasia - shuma e transaksionit. Shuma do të jetë një numër pozitiv nëse është një blerje valutë nga Banka dhe një numër negativ nëse është një shitje.
Më pas, duke përdorur metodën e dërgimit, ne dërgojmë një mesazh në server, në temën që na nevojitet, në formatin JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Kur ekzekutojmë skriptin, marrim mesazhet e mëposhtme në terminal:
Kjo do të thotë që gjithçka funksionon siç kemi dashur - prodhuesi gjeneron dhe dërgon mesazhe në temën që na nevojitet.
Hapi tjetër është të instaloni Spark dhe të përpunoni këtë transmetim mesazhesh.
Instalimi i Apache Spark
Apache Spark është një platformë kompjuterike e grupeve universale dhe me performancë të lartë.
Spark performon më mirë se implementimet e njohura të modelit MapReduce ndërsa mbështet një gamë më të gjerë të llojeve të llogaritjes, duke përfshirë pyetjet interaktive dhe përpunimin e transmetimit. Shpejtësia luan një rol të rëndësishëm gjatë përpunimit të sasive të mëdha të të dhënave, pasi është shpejtësia që ju lejon të punoni në mënyrë interaktive pa shpenzuar minuta ose orë në pritje. Një nga pikat më të forta të Spark që e bën atë kaq të shpejtë është aftësia e tij për të kryer llogaritjet në memorie.
Ky kuadër është shkruar në Scala, kështu që duhet ta instaloni së pari:
sudo apt-get install scala
Shkarkoni shpërndarjen e Spark nga faqja zyrtare e internetit:
Ekzekutoni komandën më poshtë pasi të bëni ndryshime në bashrc:
source ~/.bashrc
Vendosja e AWS PostgreSQL
E tëra që mbetet është të vendosim bazën e të dhënave në të cilën do të ngarkojmë informacionin e përpunuar nga transmetimet. Për këtë ne do të përdorim shërbimin AWS RDS.
Shkoni te tastiera AWS -> AWS RDS -> Bazat e të dhënave -> Krijoni bazën e të dhënave:
Zgjidhni PostgreSQL dhe klikoni Next:
Sepse Ky shembull është vetëm për qëllime edukative; ne do të përdorim një server falas "në minimum" (Free Tier):
Më pas, vendosim një shenjë në bllokun Free Tier, dhe pas kësaj do të na ofrohet automatikisht një shembull i klasës t2.micro - megjithëse i dobët, ai është falas dhe mjaft i përshtatshëm për detyrën tonë:
Më pas vijnë gjëra shumë të rëndësishme: emri i shembullit të bazës së të dhënave, emri i përdoruesit kryesor dhe fjalëkalimi i tij. Le të emërtojmë shembullin: myHabrTest, përdoruesi kryesor: habr, fjalëkalimi: habr12345 dhe klikoni në butonin Next:
Në faqen tjetër ka parametra përgjegjës për aksesin e serverit tonë të bazës së të dhënave nga jashtë (Qasshmëria publike) dhe disponueshmëria e portit:
Le të krijojmë një cilësim të ri për grupin e sigurisë VPC, i cili do të lejojë akses të jashtëm në serverin tonë të bazës së të dhënave nëpërmjet portit 5432 (PostgreSQL).
Le të shkojmë në tastierën AWS në një dritare të veçantë të shfletuesit te paneli VPC -> Grupet e Sigurisë -> Krijo seksionin e grupit të sigurisë:
Ne vendosëm emrin për grupin e Sigurisë - PostgreSQL, një përshkrim, tregoni se me cilin VPC duhet të lidhet ky grup dhe klikoni butonin Krijo:
Plotësoni rregullat Inbound për portin 5432 për grupin e sapokrijuar, siç tregohet në foton më poshtë. Ju nuk mund ta specifikoni portin me dorë, por zgjidhni PostgreSQL nga lista rënëse Lloji.
Në mënyrë të rreptë, vlera ::/0 nënkupton disponueshmërinë e trafikut hyrës në server nga e gjithë bota, gjë që kanonikisht nuk është plotësisht e vërtetë, por për të analizuar shembullin, le t'i lejojmë vetes të përdorim këtë qasje:
Kthehemi në faqen e shfletuesit, ku kemi hapur “Configure advanced settings” dhe zgjedhim në seksionin e grupeve të sigurisë VPC -> Zgjidhni grupet ekzistuese të sigurisë VPC -> PostgreSQL:
Më pas, në opsionet e bazës së të dhënave -> Emri i bazës së të dhënave -> vendosni emrin - habrDB.
Mund t'i lëmë parametrat e mbetur, me përjashtim të çaktivizimit të rezervimit (periudha e ruajtjes së rezervës - 0 ditë), monitorimit dhe Vështrimeve të Performancës, si parazgjedhje. Klikoni në butonin Krijo bazën e të dhënave:
Mbajtës i fijeve
Faza përfundimtare do të jetë zhvillimi i një pune Spark, e cila do të përpunojë të dhëna të reja që vijnë nga Kafka çdo dy sekonda dhe do të futë rezultatin në bazën e të dhënave.
Siç u përmend më lart, pikat e kontrollit janë një mekanizëm thelbësor në SparkStreaming që duhet të konfigurohet për të siguruar tolerancën e gabimeve. Ne do të përdorim pikat e kontrollit dhe, nëse procedura dështon, moduli Spark Streaming do të duhet vetëm të kthehet në pikën e fundit të kontrollit dhe të rifillojë llogaritjet prej tij për të rikuperuar të dhënat e humbura.
Pika e kontrollit mund të aktivizohet duke vendosur një direktori në një sistem skedari tolerant ndaj gabimeve dhe të besueshëm (si HDFS, S3, etj.) në të cilin do të ruhen informacionet e pikës së kontrollit. Kjo bëhet duke përdorur, për shembull:
streamingContext.checkpoint(checkpointDirectory)
Në shembullin tonë, ne do të përdorim qasjen e mëposhtme, domethënë, nëse ekziston Lista e pikave të kontrollit, atëherë konteksti do të rikrijohet nga të dhënat e pikës së kontrollit. Nëse drejtoria nuk ekziston (d.m.th. ekzekutohet për herë të parë), atëherë funksioniToCreateContext thirret për të krijuar një kontekst të ri dhe për të konfiguruar DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Ne krijojmë një objekt DirectStream për t'u lidhur me temën e "transaksionit" duke përdorur metodën createDirectStream të bibliotekës KafkaUtils:
Duke përdorur Spark SQL, ne bëjmë një grupim të thjeshtë dhe shfaqim rezultatin në tastierë:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Marrja e tekstit të pyetjes dhe ekzekutimi i tij përmes Spark SQL:
Dhe pastaj ne i ruajmë të dhënat e grumbulluara që rezultojnë në një tabelë në AWS RDS. Për të ruajtur rezultatet e grumbullimit në një tabelë të bazës së të dhënave, ne do të përdorim metodën e shkrimit të objektit DataFrame:
Disa fjalë për vendosjen e një lidhjeje me AWS RDS. Ne krijuam përdoruesin dhe fjalëkalimin për të në hapin "Përdorimi i AWS PostgreSQL". Ju duhet të përdorni Endpoint si url-në e serverit të bazës së të dhënave, e cila shfaqet në seksionin Lidhshmëria dhe siguria:
Në mënyrë që të lidhni saktë Spark dhe Kafka, duhet ta kryeni punën përmes smark-submit duke përdorur objektin spark-streaming-kafka-0-8_2.11. Përveç kësaj, ne do të përdorim gjithashtu një objekt për të bashkëvepruar me bazën e të dhënave PostgreSQL; ne do t'i transferojmë ato përmes --paketave.
Për fleksibilitetin e skriptit, ne do të përfshijmë gjithashtu si parametra hyrës emrin e serverit të mesazheve dhe temën nga e cila duam të marrim të dhëna.
Pra, është koha për të nisur dhe kontrolluar funksionalitetin e sistemit:
Gjithçka funksionoi! Siç mund ta shihni në foton më poshtë, ndërsa aplikacioni është duke u ekzekutuar, rezultatet e reja të grumbullimit dalin çdo 2 sekonda, sepse ne e vendosëm intervalin e grupit në 2 sekonda kur krijuam objektin StreamingContext:
Më pas, ne bëjmë një pyetje të thjeshtë në bazën e të dhënave për të kontrolluar praninë e të dhënave në tabelë rrjedha_transaksioni:
Përfundim
Ky artikull shikoi një shembull të përpunimit të rrjedhës së informacionit duke përdorur Spark Streaming në lidhje me Apache Kafka dhe PostgreSQL. Me rritjen e të dhënave nga burime të ndryshme, është e vështirë të mbivlerësohet vlera praktike e Spark Streaming për krijimin e aplikacioneve të transmetimit dhe në kohë reale.
Mund ta gjeni kodin e plotë burimor në depon time në GitHub.
Jam i lumtur të diskutoj këtë artikull, pres me padurim komentet tuaja dhe gjithashtu shpresoj për kritika konstruktive nga të gjithë lexuesit e kujdesshëm.
Ju uroj suksese!
. Fillimisht ishte planifikuar të përdorja një bazë të dhënash lokale PostgreSQL, por duke pasur parasysh dashurinë time për AWS, vendosa ta zhvendos bazën e të dhënave në cloud. Në artikullin tjetër mbi këtë temë, unë do të tregoj se si të zbatohet i gjithë sistemi i përshkruar më sipër në AWS duke përdorur AWS Kinesis dhe AWS EMR. Ndiqni lajmet!